基于1.1.5-alpha版本,具体源码笔记可以参考我的github:https://github.com/saigu/JavaKnowledgeGraph/tree/master/code_reading/canal
本文将对canal的启动模块deployer进行分析。
Deployer模块(绿色部分)在整个系统中的角色如下图所示,用来启动canal-server.
模块内的类如下:
为了能带着目的看源码,以几个问题开头,带着问题来一起探索deployer模块的源码。
- CanalServer启动过程中配置如何加载?
- CanalServer启动过程中涉及哪些组件?
- 集群模式的canalServer,是如何实现instance的HA呢?
- 每个canalServer又是怎么获取admin上的配置变更呢?
这个类是整个canal-server的入口类。负责配置加载和启动canal-server。
主流程如下:
- 加载canal.properties的配置内容
- 根据canal.admin.manager是否为空判断是否是admin控制,如果不是admin控制,就直接根据canal.properties的配置来了
- 如果是admin控制,使用PlainCanalConfigClient获取远程配置 新开一个线程池每隔五秒用http请求去admin上拉配置进行merge(这里依赖了instance模块的相关配置拉取的工具方法) 用md5进行校验,如果canal-server配置有更新,那么就重启canal-server
- 核心是用canalStarter.start()启动
- 使用CountDownLatch保持主线程存活
- 收到关闭信号,CDL-1,然后关闭配置更新线程池,优雅退出
public static void main(String[] args) {
try {
//note:设置全局未捕获异常的处理
setGlobalUncaughtExceptionHandler();
String conf = System.getProperty("canal.conf", "classpath:canal.properties");
Properties properties = new Properties();
if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
} else {
properties.load(new FileInputStream(conf));
}
final CanalStarter canalStater = new CanalStarter(properties);
String managerAddress = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
if (StringUtils.isNotEmpty(managerAddress)) {
String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
//省略一部分。。。。。。
final PlainCanalConfigClient configClient = new PlainCanalConfigClient(managerAddress,
user,
passwd,
registerIp,
Integer.parseInt(adminPort),
autoRegister,
autoCluster);
PlainCanal canalConfig = configClient.findServer(null);
if (canalConfig == null) {
throw new IllegalArgumentException("managerAddress:" + managerAddress
+ " can"t not found config for [" + registerIp + ":" + adminPort
+ "]");
}
Properties managerProperties = canalConfig.getProperties();
// merge local
managerProperties.putAll(properties);
int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(managerProperties,
CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
"5"));
executor.scheduleWithFixedDelay(new Runnable() {
private PlainCanal lastCanalConfig;
public void run() {
try {
if (lastCanalConfig == null) {
lastCanalConfig = configClient.findServer(null);
} else {
PlainCanal newCanalConfig = configClient.findServer(lastCanalConfig.getMd5());
if (newCanalConfig != null) {
// 远程配置canal.properties修改重新加载整个应用
canalStater.stop();
Properties managerProperties = newCanalConfig.getProperties();
// merge local
managerProperties.putAll(properties);
canalStater.setProperties(managerProperties);
canalStater.start();
lastCanalConfig = newCanalConfig;
}
}
} catch (Throwable e) {
logger.error("scan failed", e);
}
}
}, 0, scanIntervalInSecond, TimeUnit.SECONDS);
canalStater.setProperties(managerProperties);
} else {
canalStater.setProperties(properties);
}
canalStater.start();
//note: 这样用CDL处理和while(true)有点类似
runningLatch.await();
executor.shutdownNow();
} catch (Throwable e) {
logger.error("## Something goes wrong when starting up the canal Server:", e);
}
}
从上面的入口类,我们可以看到canal-server真正的启动逻辑在CanalStarter类的start方法。
这里先对三个对象进行辨析:
- CanalController:是canalServer真正的启动控制器
- canalMQStarter:用来启动mqProducer。如果serverMode选择了mq,那么会用canalMQStarter来管理mqProducer,将canalServer抓取到的实时变更用mqProducer直接投递到mq
- CanalAdminWithNetty:这个不是admin控制台,而是对本server启动一个netty服务,让admin控制台通过请求获取当前server的信息,比如运行状态、正在本server上运行的instance信息等
start方法主要逻辑如下:
- 根据配置的serverMode,决定使用CanalMQProducer或者canalServerWithNetty
- 启动CanalController
- 注册shutdownHook
- 如果CanalMQProducer不为空,启动canalMQStarter(内部使用CanalMQProducer将消息投递给mq)
- 启动CanalAdminWithNetty做服务器
public synchronized void start() throws Throwable {
String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
if (!"tcp".equalsIgnoreCase(serverMode)) {
ExtensionLoader loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class);
canalMQProducer = loader
.getExtension(serverMode.toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR);
if (canalMQProducer != null) {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(canalMQProducer.getClass().getClassLoader());
canalMQProducer.init(properties);
Thread.currentThread().setContextClassLoader(cl);
}
}
//note 如果启动了canalMQProducer,就不使用canalWithNetty(这里的netty是用在哪里的?)
if (canalMQProducer != null) {
MQProperties mqProperties = canalMQProducer.getMqProperties();
// disable netty
System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
if (mqProperties.isFlatMessage()) {
// 设置为raw避免ByteString->Entry的二次解析
System.setProperty("canal.instance.memory.rawEntry", "false");
}
}
controller = new CanalController(properties);
//note 2.启动canalController
controller.start();
//note 3.注册了一个shutdownHook,系统退出时执行相关逻辑
shutdownThread = new Thread() {
public void run() {
try {
controller.stop();
//note 主线程退出
CanalLauncher.runningLatch.countDown();
} catch (Throwable e) {
} finally {
}
}
};
Runtime.getRuntime().addShutdownHook(shutdownThread);
//note 4.启动canalMQStarter,集群版的话,没有预先配置destinations。
if (canalMQProducer != null) {
canalMQStarter = new CanalMQStarter(canalMQProducer);
String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
canalMQStarter.start(destinations);
controller.setCanalMQStarter(canalMQStarter);
}
// start canalAdmin
String port = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT);
//note 5.根据填写的canalAdmin的ip和port,启动canalAdmin,用netty做服务器
if (canalAdmin == null && StringUtils.isNotEmpty(port)) {
String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
CanalAdminController canalAdmin = new CanalAdminController(this);
canalAdmin.setUser(user);
canalAdmin.setPasswd(passwd);
String ip = CanalController.getProperty(properties, CanalConstants.CANAL_IP);
CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance();
canalAdminWithNetty.setCanalAdmin(canalAdmin);
canalAdminWithNetty.setPort(Integer.parseInt(port));
canalAdminWithNetty.setIp(ip);
canalAdminWithNetty.start();
this.canalAdmin = canalAdminWithNetty;
}
running = true;
}
前面两个类都是比较清晰的,一个是入口类,一个是启动类,下面来看看核心逻辑所在的CanalController。
这里用了大量的匿名内部类实现接口,看起来有点头大,耐心慢慢剖析一下。
3.1 从构造器开始了解
整体初始化的顺序如下:
- 构建PlainCanalConfigClient,用于用户远程配置的获取
- 初始化全局配置,顺便把instance相关的全局配置初始化一下
- 准备一下canal-server,核心在于embededCanalServer,如果有需要canalServerWithNetty,那就多包装一个(我们serverMode=mq是不需要这个netty的)
- 初始化zkClient
- 初始化ServerRunningMonitors,作为instance 运行节点控制
- 初始化InstanceAction,完成monitor机制。(监控instance配置变化然后调用ServerRunningMonitor进行处理)
这里有几个机制要详细介绍一下。
3.1.1 CanalServer两种模式
canalServer支持两种模式,CanalServerWithEmbedded和CanalServerWithNetty。
在构造器中初始化代码部分如下:
// 3.准备canal server
//note: 核心在于embededCanalServer,如果有需要canalServerWithNetty,那就多包装一个(我们serverMode=mq
// 是不需要这个netty的)
ip = getProperty(properties, CanalConstants.CANAL_IP);
//省略一部分。。。
embededCanalServer = CanalServerWithEmbedded.instance();
embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112"));
//省略一部分。。。
String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY);
if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) {
canalServer = CanalServerWithNetty.instance();
canalServer.setIp(ip);
canalServer.setPort(port);
}
embededCanalServer:类型为CanalServerWithEmbedded
canalServer:类型为CanalServerWithNetty
二者有什么区别呢?
都实现了CanalServer接口,且都实现了单例模式,通过静态方法instance获取实例。
关于这两种类型的实现,canal官方文档有以下描述:
说白了,就是我们可以不必独立部署canal server。在应用直接使用CanalServerWithEmbedded直连mysql数据库进行订阅。
如果觉得自己的技术hold不住相关代码,就独立部署一个canal server,使用canal提供的客户端,连接canal server获取binlog解析后数据。而CanalServerWithNetty是在CanalServerWithEmbedded的基础上做的一层封装,用于与客户端通信。
在独立部署canal server时,Canal客户端发送的所有请求都交给CanalServerWithNetty处理解析,解析完成之后委派给了交给CanalServerWithEmbedded进行处理。因此CanalServerWithNetty就是一个马甲而已。CanalServerWithEmbedded才是核心。
因此,在构造器中,我们看到,
用于生成CanalInstance实例的instanceGenerator被设置到了CanalServerWithEmbedded中,
而ip和port被设置到CanalServerWithNetty中。
关于CanalServerWithNetty如何将客户端的请求委派给CanalServerWithEmbedded进行处理,我们将在server模块源码分析中进行讲解。
3.1.2 ServerRunningMonitor
在CanalController的构造器中,canal会为每一个destination创建一个Instance,每个Instance都会由一个ServerRunningMonitor来进行控制。而ServerRunningMonitor统一由ServerRunningMonitors进行管理。
ServerRunningMonitor是做什么的呢?
我们看下它的属性就了解了。它主要用来记录每个instance的运行状态数据的。
public class ServerRunningMonitor extends AbstractCanalLifeCycle {
private static final Logger logger = LoggerFactory.getLogger(ServerRunningMonitor.class);
private ZkClientx zkClient;
private String destination;
private IZkDataListener dataListener;
private BooleanMutex mutex = new BooleanMutex(false);
private volatile boolean release = false;
// 当前服务节点状态信息
private ServerRunningData serverData;
// 当前实际运行的节点状态信息
private volatile ServerRunningData activeData;
private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);
private int delayTime = 5;
private ServerRunningListener listener;
public ServerRunningMonitor(ServerRunningData serverData){
this();
this.serverData = serverData;
}
//。。。。。
}
在创建ServerRunningMonitor对象时,首先根据ServerRunningData创建ServerRunningMonitor实例,之后设置了destination和ServerRunningListener。
ServerRunningListener是个接口,这里采用了匿名内部类的形式构建,实现了各个接口的方法。
主要为instance在当前server上的状态发生变化时调用。比如要在当前server上启动这个instance了,就调用相关启动方法,如果在这个server上关闭instance,就调用相关关闭方法。
具体的调用逻辑我们后面在启动过程中分析,这里大概知道下构造器中做了些什么就行了,主要就是一些启动、关闭的逻辑。
new Function<String, ServerRunningMonitor>() {
public ServerRunningMonitor apply(final String destination) {
ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
runningMonitor.setDestination(destination);
runningMonitor.setListener(new ServerRunningListener() {
public void processActiveEnter() {
//省略具体内容。。。
}
public void processActiveExit() {
//省略具体内容。。。
}
public void processStart() {
//省略具体内容。。。
}
public void processStop() {
//省略具体内容。。。
}
});
if (zkclientx != null) {
runningMonitor.setZkClient(zkclientx);
}
// 触发创建一下cid节点
runningMonitor.init();
return runningMonitor;
}
}
3.2 canalController的start方法
具体运行逻辑如下:
- 在zk的/otter/canal/cluster目录下根据ip:port创建server的临时节点,注册zk监听器
- 先启动embededCanalServer(会启动对应的监控)
- 根据配置的instance的destination,调用runningMonitor.start() 逐个启动instance
- 如果cannalServer不为空,启动canServer (canalServerWithNetty)
这里需要注意,canalServer什么时候为空?
如果用户选择了serverMode为mq,那么就不会启动canalServerWithNetty,采用mqStarter来作为server,直接跟mq集群交互。canalServerWithNetty只有在serverMode为tcp时才启动,用来跟canal-client做交互。
所以如果以后想把embeddedCanal嵌入自己的应用,可以考虑参考mqStarter的写法。后面我们在server模块中会做详细解析。
public void start() throws Throwable {
// 创建整个canal的工作节点
final String path = ZookeeperPathUtils.getCanalClusterNode(registerIp + ":" + port);
initCid(path);
if (zkclientx != null) {
this.zkclientx.subscribeStateChanges(new IZkStateListener() {
public void handleStateChanged(KeeperState state) throws Exception {
}
public void handleNewSession() throws Exception {
initCid(path);
}
@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception{
logger.error("failed to connect to zookeeper", error);
}
});
}
// 先启动embeded服务
embededCanalServer.start();
// 尝试启动一下非lazy状态的通道
for (Map.Entry entry : instanceConfigs.entrySet()) {
final String destination = entry.getKey();
InstanceConfig config = entry.getValue();
// 创建destination的工作节点
if (!embededCanalServer.isStart(destination)) {
// HA机制启动
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
if (!config.getLazy() && !runningMonitor.isStart()) {
runningMonitor.start();
}
}
//note:为每个instance注册一个配置监视器
if (autoScan) {
instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);
}
}
if (autoScan) {
//note:启动线程定时去扫描配置
instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
//note:这部分代码似乎没有用,目前只能是manager或者spring两种方式二选一
for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
if (!monitor.isStart()) {
monitor.start();
}
}
}
// 启动网络接口
if (canalServer != null) {
canalServer.start();
}
}
我们重点关注启动instance的过程,也就是ServerRunningMonitor的运行机制,也就是HA启动的关键。
入口在runningMonitor.start()。
- 如果zkClient != null,就用zk进行HA启动
- 否则,就直接processActiveEnter启动,这个我们前面已经分析过了
public synchronized void start() {
super.start();
try {
processStart();
if (zkClient != null) {
// 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start
String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
zkClient.subscribeDataChanges(path, dataListener);
initRunning();
} else {
processActiveEnter();// 没有zk,直接启动
}
} catch (Exception e) {
logger.error("start failed", e);
// 没有正常启动,重置一下状态,避免干扰下一次start
stop();
}
}
重点关注下HA启动方式,一般 我们都采用这种模式进行。
在集群模式下,可能会有多个canal server共同处理同一个destination,
在某一时刻,只能由一个canal server进行处理,处理这个destination的canal server进入running状态,其他canal server进入standby状态。
同时,通过监听对应的path节点,一旦发生变化,出现异常,可以立刻尝试自己进入running,保证了instace的 高可用!!
启动的重点还是在initRuning()。
利用zk来保证集群中有且只有 一个instance任务在运行。
- 还构建一个临时节点的路径:/otter/canal/destinations/{0}/running
- 尝试创建临时节点。
- 如果节点已经存在,说明是其他的canal server已经启动了这个canal instance。此时会抛出ZkNodeExistsException,进入catch代码块。
- 如果创建成功,就说明没有其他server启动这个instance,可以创建
private void initRunning() {
if (!isStart()) {
return;
}
//note: 还是一样构建一个临时节点的路径:/otter/canal/destinations/{0}/running
String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
// 序列化
byte[] bytes = JsonUtils.marshalToByte(serverData);
try {
mutex.set(false);
zkClient.create(path, bytes, CreateMode.EPHEMERAL);
activeData = serverData;
processActiveEnter();// 触发一下事件
mutex.set(true);
release = false;
} catch (ZkNodeExistsException e) {
bytes = zkClient.readData(path, true);
if (bytes == null) {// 如果不存在节点,立即尝试一次
initRunning();
} else {
activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
}
} catch (ZkNoNodeException e) {
zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点
initRunning();
}
}
那运行中的HA是如何实现的呢,我们回头看一下
zkClient.subscribeDataChanges(path, dataListener);
对destination对应的running节点进行监听,一旦发生了变化,则说明可能其他处理相同destination的canal server可能出现了异常,此时需要尝试自己进入running状态。
dataListener是在ServerRunningMonitor的构造方法中初始化的,
包括节点发生变化、节点被删两种变化情况以及相对应的处理逻辑,如下 :
public ServerRunningMonitor(){
// 创建父节点
dataListener = new IZkDataListener() {
public void handleDataChange(String dataPath, Object data) throws Exception {
MDC.put("destination", destination);
ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);
if (!isMine(runningData.getAddress())) {
mutex.set(false);
}
if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active
releaseRunning();// 彻底释放mainstem
}
activeData = (ServerRunningData) runningData;
}
public interface InstanceAction {
void start(String destination);
void release(String destination);
void stop(String destination);
void reload(String destination);
}
具体实现在canalController的构造器中实现了匿名类。
4.2 InstanceConfigMonitor
这个接口有两个实现,一个是基于spring的,一个基于manager(就是admin)。
我们看下基于manager配置的实现的ManagerInstanceConfigMonitor即可。
原理很简单。
- 采用一个固定大小线程池,每隔5s,使用PlainCanalConfigClient去拉取instance配置
- 然后通过defaultAction去start
- 这个start在canalController的构造器的匿名类中实现,会使用instance对应的runningMonitor做HA启动。具体逻辑上一小节已经详细介绍过了。
public class ManagerInstanceConfigMonitor extends AbstractCanalLifeCycle implements InstanceConfigMonitor, CanalLifeCycle {
private static final Logger logger = LoggerFactory.getLogger(ManagerInstanceConfigMonitor.class);
private long scanIntervalInSecond = 5;
private InstanceAction defaultAction = null;
private Map actions = new MapMaker().makeMap();
private Map configs = MigrateMap.makeComputingMap(new Function() {
public PlainCanal apply(String destination) {
return new PlainCanal();
}
});
private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("canal-instance-scan"));
private volatile boolean isFirst = true;
private PlainCanalConfigClient configClient;
//…
}
deployer模块的主要作用:
1)读取canal.properties,确定canal instance的配置加载方式。如果使用了admin,那么还会定时拉取admin上的配置更新。
2)确定canal-server的启动方式:独立启动或者集群方式启动
3)利用zkClient监听canal instance在zookeeper上的状态变化,动态停止、启动或新增,实现了instance的HA
4)利用InstanceConfigMonitor,采用固定线程定时轮训admin,获取instance的最新配置
5)启动canal server,监听客户端请求
这里还有个非常有意思的问题没有展开说明,那就是CanalStarter里面的配置加载,通过ExtensionLoader类的相关实现,如何通过不同的类加载器,实现SPI,后面再分析吧。
看到这里了,原创不易,点个关注、点个赞吧,你最好看了~
文章持续更新,可以微信搜索「阿丸笔记 」第一时间阅读,回复关键字【学习】有我准备的一线大厂面试资料。
知识碎片重新梳理,构建Java知识图谱:https://github.com/saigu/JavaKnowledgeGraph(历史文章查阅非常方便)