文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Canal+Kafka实现Mysql数据同步

2023-09-05 18:31

关注

Canal介绍

canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。

canal是应阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的。

阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。

canal主要用途是基于 MySQL 数据库增量日志解析,并能提供增量数据订阅和消费,应用场景十分丰富。

目前canal主要支持mysql数据库。

github地址:https://github.com/alibaba/canal

版本下载地址:https://github.com/alibaba/canal/releases

文档地址:https://github.com/alibaba/canal/wiki/Docker-QuickStart

Canal应用场景

1)、电商场景下商品、用户实时更新同步到至Elasticsearch、solr等搜索引擎;
2)、价格、库存发生变更实时同步到redis;
3)、数据库异地备份、数据同步;
4)、代替使用轮询数据库方式来监控数据库变更,有效改善轮询耗费数据库资源。

image.png

MySQL主从复制原理

1)、MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
2)、MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
3)、MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

Canal工作原理

image.pngCanal安装

参考文档:https://github.com/alibaba/canal/wiki/QuickStart

Canal配置

mq相关参数说明 (>=1.1.5版本)

在1.1.5版本开始,引入了MQ Connector设计,参数配置做了部分调整

参数名

参数说明

默认值

canal.aliyun.accessKey

阿里云ak

canal.aliyun.secretKey

阿里云sk

canal.aliyun.uid

阿里云uid

canal.mq.flatMessage

是否为json格式 如果设置为false,对应MQ收到的消息为protobuf格式 需要通过CanalMessageDeserializer进行解码

false

canal.mq.canalBatchSize

获取canal数据的批次大小

50

canal.mq.canalGetTimeout

获取canal数据的超时时间

100

canal.mq.accessChannel = local

是否为阿里云模式,可选值local/cloud

local

canal.mq.database.hash

是否开启database混淆hash,确保不同库的数据可以均匀分散,如果关闭可以确保只按照业务字段做MQ分区计算

true

canal.mq.send.thread.size

MQ消息发送并行度

30

canal.mq.build.thread.size

MQ消息构建并行度

8

kafka.bootstrap.servers

kafka服务端地址

127.0.0.1:9092

kafka.acks

kafka为ProducerConfig.ACKS_CONFIG

all

kafka.compression.type

压缩类型

none

kafka.batch.size

kafka为ProducerConfig.BATCH_SIZE_CONFIG

16384

kafka.linger.ms

kafka为ProducerConfig.LINGER_MS_CONFIG , 如果是flatMessage格式建议将该值调大, 如: 200

1

kafka.max.request.size

kafka为ProducerConfig.MAX_REQUEST_SIZE_CONFIG

1048576

kafka.buffer.memory

kafka为ProducerConfig.BUFFER_MEMORY_CONFIG

33554432

kafka.max.in.flight.requests.per.connection

kafka为ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION

1

kafka.retries

发送失败重试次数

0

kafka.kerberos.enable

kerberos认证

false

kafka.kerberos.krb5.file

kerberos认证

../conf/kerberos/krb5.conf

kafka.kerberos.jaas.file

kerberos认证

../conf/kerberos/jaas.conf

rocketmq.producer.group

rocketMQ为ProducerGroup名

test

rocketmq.enable.message.trace

是否开启message trace

false

rocketmq.customized.trace.topic

message trace的topic

rocketmq.namespace

rocketmq的namespace

rocketmq.namesrv.addr

rocketmq的namesrv地址

127.0.0.1:9876

rocketmq.retry.times.when.send.failed

重试次数

0

rocketmq.vip.channel.enabled

rocketmq是否开启vip channel

false

rocketmq.tag

rocketmq的tag配置

空值

rabbitmq.host

rabbitMQ配置

rabbitmq.virtual.host

rabbitMQ配置

rabbitmq.exchange

rabbitMQ配置

rabbitmq.username

rabbitMQ配置

rabbitmq.password

rabbitMQ配置

rabbitmq.deliveryMode

rabbitMQ配置

pulsarmq.serverUrl

pulsarmq配置

pulsarmq.roleToken

pulsarmq配置

pulsarmq.topicTenantPrefix

pulsarmq配置

canal.mq.topic

mq里的topic名

canal.mq.dynamicTopic

mq里的动态topic规则, 1.1.3版本支持

canal.mq.partition

单队列模式的分区下标,

1

canal.mq.enableDynamicQueuePartition

动态获取MQ服务端的分区数,如果设置为true之后会自动根据topic获取分区数替换canal.mq.partitionsNum的定义,目前主要适用于RocketMQ

false

canal.mq.partitionsNum

散列模式的分区数

canal.mq.dynamicTopicPartitionNum

mq里的动态队列分区数,比如针对不同topic配置不同partitionsNum

canal.mq.partitionHash

散列规则定义 库名.表名 : 唯一主键,比如mytest.person: id 1.1.3版本支持新语法,见下文

canal.mq.dynamicTopic 表达式说明

canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多个配置之间使用逗号或分号分隔

为满足更大的灵活性,允许对匹配条件的规则指定发送的topic名字,配置格式:topicName:schema 或 topicName:schema.table

大家可以结合自己的业务需求,设置匹配规则,建议MQ开启自动创建topic的能力

canal.mq.partitionHash 表达式说明

canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多个配置之间使用逗号分隔

注意:大家可以结合自己的业务需求,设置匹配规则,多条匹配规则之间是按照顺序进行匹配(命中一条规则就返回)

其他详细参数可参考Canal AdminGuide

mq顺序性问题

binlog本身是有序的,写入到mq之后如何保障顺序是很多人会比较关注,在issue里也有非常多人咨询了类似的问题,这里做一个统一的解答

  1. 1.

    canal目前选择支持的kafka/rocketmq,本质上都是基于本地文件的方式来支持了分区级的顺序消息的能力,也就是binlog写入mq是可以有一些顺序性保障,这个取决于用户的一些参数选择

  2. 2.

    canal支持MQ数据的几种路由方式:单topic单分区,单topic多分区、多topic单分区、多topic多分区

  1. 1.

    canal的消费顺序性,主要取决于描述2中的路由选择,举例说明:

性能表现

Kafka + 混合DML场景测试

场景

1个topic + 单分区

1个topic+3分区

2个topic+1分区

2个topic+3分区

不开启flatMessage

6k rps (9.71k tps)

54k rps (6.53k tps)

6k rps (7.9k tps)

8k rps (5.71k tps)

开启flatMessage

79k rps (4.36k tps)

97 rps (5.94k tps)

91k rps (4.45k tps)

96k rps (6.26k tps)

Kafka + 单表的batch insert场景测试

场景

1个topic + 单分区

1个topic+3分区

不开启flatMessage

6k rps

1k rps

开启flatMessage

3k rps

6k rps


RocketMQ + 混合DML场景测试

场景

1个topic + 单分区

1个topic+3分区

2个topic+1分区

2个topic+3分区

不开启flatMessage

6k rps (10.71k tps)

3k rps (8.59k tps)

7k rps (9.46k tps)

7k rps (7.66k tps)

开启flatMessage

75k rps (6.17k tps)

96k rps (5.55k tps)

83k rps (6.63k tps)

93k rps (6.26k tps)

RocketMQ + 单表的batch insert场景测试

场景

1个topic + 单分区

1个topic+3分区

不开启flatMessage

2k rps

3k rps

开启flatMessage

6k rps

9k rps

附录:

canal官方文档:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

Canal+MQ性能表现:https://github.com/alibaba/canal/wiki/Canal-MQ-Performance

参考文档:https://www.cnblogs.com/zwh0910/p/17043265.html

来源地址:https://blog.csdn.net/shadow_zed/article/details/132209818

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-数据库
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯