文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

实战!Spring Boot 整合 阿里开源中间件 Canal 实现数据增量同步!

2024-12-02 11:47

关注

本文转载自微信公众号「码猿技术专栏」,作者不才陈某 。转载本文请联系码猿技术专栏公众号。

数据同步一直是一个令人头疼的问题。在业务量小,场景不多,数据量不大的情况下我们可能会选择在项目中直接写一些定时任务手动处理数据,例如从多个表将数据查出来,再汇总处理,再插入到相应的地方。

但是随着业务量增大,数据量变多以及各种复杂场景下的分库分表的实现,使数据同步变得越来越困难。

今天这篇文章使用阿里开源的中间件Canal解决数据增量同步的痛点。

文章目录如下:

Canal是什么?

canal译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

从这句话理解到了什么?

基于MySQL,并且通过MySQL日志进行的增量解析,这也就意味着对原有的业务代码完全是无侵入性的。

工作原理:解析MySQL的binlog日志,提供增量数据。

基于日志增量订阅和消费的业务包括

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

官方文档:https://github.com/alibaba/canal

Canal数据如何传输?

先来一张官方图:

Canal分为服务端和客户端,这也是阿里常用的套路,比如前面讲到的注册中心Nacos:

目前为止支持的消息中间件很全面了,比如Kafka、RocketMQ,RabbitMQ。

数据同步还有其他中间件吗?

有,当然有,还有一些开源的中间件也是相当不错的,比如Bifrost。

常见的几款中间件的区别如下:

当然要我选择的话,首选阿里的中间件Canal。

Canal服务端安装

服务端需要下载压缩包,下载地址:https://github.com/alibaba/canal/releases

目前最新的是v1.1.5,点击下载:

下载完成解压,目录如下:

本文使用Canal+RabbitMQ进行数据的同步,因此下面步骤完全按照这个base进行。

1、打开MySQL的binlog日志

修改MySQL的日志文件,my.cnf 配置如下:

  1. [mysqld] 
  2. log-bin=mysql-bin # 开启 binlog 
  3. binlog-format=ROW # 选择 ROW 模式 
  4. server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复 

2、设置MySQL的配置

需要设置服务端配置文件中的MySQL配置,这样Canal才能知道需要监听哪个库、哪个表的日志文件。

一个 Server 可以配置多个实例监听 ,Canal 功能默认自带的有个 example 实例,本篇就用 example 实例 。如果增加实例,复制 example 文件夹内容到同级目录下,然后在 canal.properties 指定添加实例的名称。

修改canal.deployer-1.1.5\conf\example\instance.properties配置文件

  1. # url 
  2. canal.instance.master.address=127.0.0.1:3306 
  3. # username/password 
  4. canal.instance.dbUsername=root 
  5. canal.instance.dbPassword=root 
  6. # 监听的数据库 
  7. canal.instance.defaultDatabaseName=test 
  8.  
  9. # 监听的表,可以指定,多个用逗号分割,这里正则是监听所有 
  10. canal.instance.filter.regex=.*\\..* 

3、设置RabbitMQ的配置

服务端默认的传输方式是tcp,需要在配置文件中设置MQ的相关信息。

这里需要修改两处配置文件,如下;

1)、canal.deployer-1.1.5\conf\canal.properties

这个配置文件主要是设置MQ相关的配置,比如URL,用户名、密码...

  1. # 传输方式:tcp, kafka, rocketMQ, rabbitMQ 
  2. canal.serverMode = rabbitMQ 
  3. ################################################## 
  4. #########           RabbitMQ         ############# 
  5. ################################################## 
  6. rabbitmq.host = 127.0.0.1 
  7. rabbitmq.virtual.host =/ 
  8. # exchange 
  9. rabbitmq.exchange =canal.exchange 
  10. # 用户名、密码 
  11. rabbitmq.username =guest 
  12. rabbitmq.password =guest 
  13. ## 是否持久化 
  14. rabbitmq.deliveryMode = 2 

2)、canal.deployer-1.1.5\conf\example\instance.properties

这个文件设置MQ的路由KEY,这样才能路由到指定的队列中,如下:

  1. canal.mq.topic=canal.routing.key 

4、RabbitMQ新建exchange和Queue

在RabbitMQ中需要新建一个canal.exchange(必须和配置中的相同)的exchange和一个名称为 canal.queue(名称随意)的队列。

其中绑定的路由KEY为:canal.routing.key(必须和配置中的相同),如下图:

5、启动服务端

点击bin目录下的脚本,windows直接双击startup.bat,启动成功如下:

6、测试

在本地数据库test中的oauth_client_details插入一条数据,如下:

  1. INSERT INTO `oauth_client_details` VALUES ('myjszl''res1''$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W''all''password,refresh_token,authorization_code,client_credentials,implicit''http://www.baidu.com'NULL, 1000, 1000, NULL'false'); 

此时查看MQ中的canal.queue已经有了数据,如下:

其实就是一串JSON数据,这个JSON如下:

  1.  "data": [{ 
  2.   "client_id""myjszl"
  3.   "resource_ids""res1"
  4.   "client_secret""$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W"
  5.   "scope""all"
  6.   "authorized_grant_types""password,refresh_token,authorization_code,client_credentials,implicit"
  7.   "web_server_redirect_uri""http://www.baidu.com"
  8.   "authorities"null
  9.   "access_token_validity""1000"
  10.   "refresh_token_validity""1000"
  11.   "additional_information"null
  12.   "autoapprove""false" 
  13.  }], 
  14.  "database""test"
  15.  "es": 1640337532000, 
  16.  "id": 7, 
  17.  "isDdl"false
  18.  "mysqlType": { 
  19.   "client_id""varchar(48)"
  20.   "resource_ids""varchar(256)"
  21.   "client_secret""varchar(256)"
  22.   "scope""varchar(256)"
  23.   "authorized_grant_types""varchar(256)"
  24.   "web_server_redirect_uri""varchar(256)"
  25.   "authorities""varchar(256)"
  26.   "access_token_validity""int(11)"
  27.   "refresh_token_validity""int(11)"
  28.   "additional_information""varchar(4096)"
  29.   "autoapprove""varchar(256)" 
  30.  }, 
  31.  "old"null
  32.  "pkNames": ["client_id"], 
  33.  "sql"""
  34.  "sqlType": { 
  35.   "client_id": 12, 
  36.   "resource_ids": 12, 
  37.   "client_secret": 12, 
  38.   "scope": 12, 
  39.   "authorized_grant_types": 12, 
  40.   "web_server_redirect_uri": 12, 
  41.   "authorities": 12, 
  42.   "access_token_validity": 4, 
  43.   "refresh_token_validity": 4, 
  44.   "additional_information": 12, 
  45.   "autoapprove": 12 
  46.  }, 
  47.  "table""oauth_client_details"
  48.  "ts": 1640337532520, 
  49.  "type""INSERT" 

每个字段的意思已经很清楚了,有表名称、方法、参数、参数类型、参数值.....

客户端要做的就是监听MQ获取JSON数据,然后将其解析出来,处理自己的业务逻辑。

Canal客户端搭建

客户端很简单实现,要做的就是消费Canal服务端传递过来的消息,监听canal.queue这个队列。

1、创建消息实体类

MQ传递过来的是JSON数据,当然要创建个实体类接收数据,如下:

  1.  
  2. @NoArgsConstructor 
  3. @Data 
  4. public class CanalMessage { 
  5.     @JsonProperty("type"
  6.     private String type; 
  7.  
  8.     @JsonProperty("table"
  9.     private String table
  10.  
  11.     @JsonProperty("data"
  12.     private List data; 
  13.  
  14.     @JsonProperty("database"
  15.     private String database
  16.  
  17.     @JsonProperty("es"
  18.     private Long es; 
  19.  
  20.     @JsonProperty("id"
  21.     private Integer id; 
  22.  
  23.     @JsonProperty("isDdl"
  24.     private Boolean isDdl; 
  25.  
  26.     @JsonProperty("old"
  27.     private List old; 
  28.  
  29.     @JsonProperty("pkNames"
  30.     private List pkNames; 
  31.  
  32.     @JsonProperty("sql"
  33.     private String sql; 
  34.  
  35.     @JsonProperty("ts"
  36.     private Long ts; 

2、MQ消息监听业务

接下来就是监听队列,一旦有Canal服务端有数据推送能够及时的消费。

代码很简单,只是给出个接收的案例,具体的业务逻辑可以根据业务实现,如下:

  1. import cn.hutool.json.JSONUtil; 
  2. import cn.myjszl.middle.ware.canal.mq.rabbit.model.CanalMessage; 
  3. import lombok.RequiredArgsConstructor; 
  4. import lombok.extern.slf4j.Slf4j; 
  5. import org.springframework.amqp.rabbit.annotation.Exchange; 
  6. import org.springframework.amqp.rabbit.annotation.Queue; 
  7. import org.springframework.amqp.rabbit.annotation.QueueBinding; 
  8. import org.springframework.amqp.rabbit.annotation.RabbitListener; 
  9. import org.springframework.stereotype.Component; 
  10.  
  11.  
  12. @Component 
  13. @Slf4j 
  14. @RequiredArgsConstructor 
  15. public class CanalRabbitMQListener { 
  16.  
  17.     @RabbitListener(bindings = { 
  18.             @QueueBinding( 
  19.                     value = @Queue(value = "canal.queue", durable = "true"), 
  20.                     exchange = @Exchange(value = "canal.exchange"), 
  21.                     key = "canal.routing.key" 
  22.             ) 
  23.     }) 
  24.     public void handleDataChange(String message) { 
  25.         //将message转换为CanalMessage 
  26.         CanalMessage canalMessage = JSONUtil.toBean(message, CanalMessage.class); 
  27.         String tableName = canalMessage.getTable(); 
  28.         log.info("Canal 监听 {} 发生变化;明细:{}", tableName, message); 
  29.         //TODO 业务逻辑自己完善............... 
  30.     } 

3、测试

下面向表中插入数据,看下接收的消息是什么样的,SQL如下:

  1. INSERT INTO `oauth_client_details` 
  2. VALUES 
  3.  ( 'myjszl''res1''$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W''all''password,refresh_token,authorization_code,client_credentials,implicit''http://www.baidu.com'NULL, 1000, 1000, NULL'false' ); 

客户端转换后的消息如下图:

上图可以看出所有的数据都已经成功接收到,只需要根据数据完善自己的业务逻辑即可。

客户端案例源码已经上传GitHub,关注公众号:码猿技术专栏,回复关键词:9530 获取!

总结

 

数据增量同步的开源工具并不只有Canal一种,根据自己的业务需要选择合适的组件。

 

来源: 码猿技术专栏内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯