文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

MySQL CDC技术方案梳理

2023-08-25 16:25

关注

  本篇主要探讨MySQL数据同步的各类常见技术方案及优劣势对比分析,从而更加深层次的理解方案,进而在后续的实际业务中,更好的选择方案。

  CDC即Change Data Capture,变更数据捕获,即当数据发生变更时,能够实时或准实时的捕获到数据的变化,以MySQL为例,产生数据变更的操作有insertupdatedelete。CDC技术就时在数据变更时,能够以安全、可靠的方式同步给其他服务、存储,如mongodb、es、kafka、redis、clickhouse等。

  目前一些常用的组件有alibaba canalapache flinkgo-mysql-transfer等。CDC 的技术方案非常多,目前业界主流的实现机制可以分为两种:

2.1 基于查询的 CDC

2.2 基于日志的 CDC

flink cdcDebeziumCanalSqoopKettleOracle GoldengateGo-mysql-transfer
CDC机制日志日志日志查询查询日志日志
增量同步
全量同步
断点续传
全量 + 增量
架构分布式单机单机分布式分布式分布式单机
Transformation⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️
生态⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️

如上图所示,需要根据实际业务场景,决定使用哪一种开源方案。

cdc,顾名思义,就是数据变更捕获,其本质是实时获取MySQL数据变更(增删改),进而同步其他服务或者业务方。因此其使用场景主要分为:

  1. 数据分发:将一个数据源的数据分发给多个下游业务系统,常用于业务解耦、微服务系统。
  2. 数据采集:面向数据仓库、数据湖的ETL数据集成,消除数据孤岛,便于后续的分析。
  3. 数据同步:常用于数据备份、容灾等。

5.1 开启MySQL的binlog

[mysqld]default-storage-engine=INNODBserver-id = 100`唯一`)port = 3306log-bin=mysql-bin (`开启`)binlog_format = ROW (`注意要设置为行模式`

开启之后,在MySQL的数据目录(/usr/local/mysql-8.0.32-macos13-arm64/data),就会生成相应的binlog文件

-rw-r-----    1 _mysql  _mysql      1867  6 12 00:03 mysql-bin.000001-rw-r-----    1 _mysql  _mysql      5740  6 18 20:55 mysql-bin.000002-rw-r-----    1 _mysql  _mysql        38  6 12 00:03 mysql-bin.index

查看binlog开启

5.2 创建canal同步账户及权限设置

mysql> CREATE USER canal IDENTIFIED BY 'canal';  mysql> GRANT SELECT, SHOW VIEW, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';mysql> FLUSH PRIVILEGES;

6.1 canal同步kafka原理

原理等同于MySQL的主从复制,具体流程:

  1. canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  2. MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  3. canal 解析 binary log 对象(原始为 byte 流)

6.2 canal安装与配置

具体配置请参考文章 https://www.cnblogs.com/Clera-tea/p/16517424.html

6.2.1 配置文件

/canal/conf/canal.properties

6.2.2 同步kafka配置

canal.serverMode = kafka
###########################################################                    Kafka                   ###############################################################kafka.bootstrap.servers = 127.0.0.1:9092 (本机kafka服务)kafka.acks = allkafka.compression.type = nonekafka.batch.size = 16384kafka.linger.ms = 1kafka.max.request.size = 1048576kafka.buffer.memory = 33554432kafka.max.in.flight.requests.per.connection = 1kafka.retries = 0kafka.kerberos.enable = falsekafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"

6.2.3 binlog过滤设置

# binlog filter configcanal.instance.filter.druid.ddl = false(注意这里true 改成 false)canal.instance.filter.query.dcl = falsecanal.instance.filter.query.dml = falsecanal.instance.filter.query.ddl = falsecanal.instance.filter.table.error = falsecanal.instance.filter.rows = falsecanal.instance.filter.transaction.entry = falsecanal.instance.filter.dml.insert = falsecanal.instance.filter.dml.update = falsecanal.instance.filter.dml.delete = false

6.2.4 同步destinations设置

canal.destinations = example,mytopic(多个逗号分隔)

6.2.5 每个topic都有各自的实例配置

路径/conf/topicname/instance.properties
设置监听mysql地址

canal.instance.master.address=127.0.0.1:3306

配置mysql账户

canal.instance.dbUsername=canalcanal.instance.dbPassword=canalcanal.instance.connectionCharset = UTF-8

配置canal同步到kafka topic信息

canal.mq.topic=mytopic

6.2.6 kafka数据接收

1 mysql2 zkServer start3 kafka-server-start /opt/homebrew/etc/kafka/server.properties4 canal/bin/startup.sh

kafka 消费者收到的消息如下

{    "data":[        {            "id":"22",            "url":"1",            "source":"d",            "status":"1",            "created_at":"2023-06-29 00:10:31",            "updated_at":"2023-06-29 00:10:31"        }    ],    "database":"finance",    "es":1687968631000,    "id":2,    "isDdl":false,    "mysqlType":{        "id":"int unsigned",        "url":"varchar(2048)",        "source":"varchar(32)",        "status":"tinyint",        "created_at":"datetime",        "updated_at":"datetime"    },    "old":null,    "pkNames":[        "id"    ],    "sql":"",    "sqlType":{        "id":4,        "url":12,        "source":12,        "status":-6,        "created_at":93,        "updated_at":93    },    "table":"f_collect",    "ts":1687968631537,    "type":"INSERT"}
{    "data":[        {            "id":"22",            "url":"1",            "source":"d",            "status":"100",            "created_at":"2023-06-29 00:10:31",            "updated_at":"2023-06-29 00:31:39"        }    ],    "database":"finance",    "es":1687969899000,    "id":3,    "isDdl":false,    "mysqlType":{        "id":"int unsigned",        "url":"varchar(2048)",        "source":"varchar(32)",        "status":"tinyint",        "created_at":"datetime",        "updated_at":"datetime"    },    "old":[        {            "status":"1",            "updated_at":"2023-06-29 00:10:31"        }    ],    "pkNames":[        "id"    ],    "sql":"",    "sqlType":{        "id":4,        "url":12,        "source":12,        "status":-6,        "created_at":93,        "updated_at":93    },    "table":"f_collect",    "ts":1687969899293,    "type":"UPDATE"}
{    "data":[        {            "id":"22",            "url":"1",            "source":"d",            "status":"100",            "created_at":"2023-06-29 00:10:31",            "updated_at":"2023-06-29 00:31:39"        }    ],    "database":"finance",    "es":1687969946000,    "id":4,    "isDdl":false,    "mysqlType":{        "id":"int unsigned",        "url":"varchar(2048)",        "source":"varchar(32)",        "status":"tinyint",        "created_at":"datetime",        "updated_at":"datetime"    },    "old":null,    "pkNames":[        "id"    ],    "sql":"",    "sqlType":{        "id":4,        "url":12,        "source":12,        "status":-6,        "created_at":93,        "updated_at":93    },    "table":"f_collect",    "ts":1687969946443,    "type":"DELETE"}

7.1 基本说明

项目github地址:go-mysql-transfer

  1. 简单,不依赖其它组件,一键部署
  2. 集成多种接收端,如:Redis、MongoDB、Elasticsearch、RocketMQ、Kafka、RabbitMQ、HTTP API等,无需编写客户端,开箱即用
  3. 内置丰富的数据解析、消息生成规则、模板语法
  4. 支持Lua脚本扩展,可处理复杂逻辑
  5. 集成Prometheus客户端,支持监控告警
  6. 集成Web Admin监控页面
  7. 支持高可用集群部署
  8. 数据同步失败重试
  9. 支持全量数据初始化

7.2 原理

  1. 将自己伪装为MySQL的Slave监听binlog,获取binlog的变更数据
  2. 根据规则或者lua脚本解析数据,生成指定格式的消息
  3. 将生成的消息批量发送给接收端

7.3 安装

1、依赖Golang 1.14 及以上版本2、设置' GO111MODULE=on '3、拉取源码 ' git clone https://github.com/wj596/go-mysql-transfer.git '4、进入目录,执行 ' go build ' 编译

7.4 全量数据同步

./go-mysql-transfer -stock

7.5 配置文件app.yaml

都能看懂,不做详细说明,主要配置项

1. mysql2. target (kafka)3. kafka配置4. rule4.1 数据库,表,字段4.2 lua_file_path: lua/sync.lua 可以只配置基本的数据格式,也可以配置lua脚本来调整数据格式4.3 kafka topic
# mysql配置addr: 127.0.0.1:3306user: #mysql用户名pass: #mysql密码charset : utf8slave_id: 1001 #slave IDflavor: mysql #mysql or mariadb,默认mysql#系统相关配置#data_dir: D:\\transfer #应用产生的数据存放地址,包括日志、缓存数据等,默认当前运行目录下store文件夹#logger:#  level: info #日志级别;支持:debug|info|warn|error,默认info#maxprocs: 50 #并发协(线)程数量,默认为: CPU核数*2;一般情况下不需要设置此项#bulk_size: 1000 #每批处理数量,不写默认100,可以根据带宽、机器性能等调整;如果是全量数据初始化时redis建议设为1000,其他接收端酌情调大#prometheus相关配置#enable_exporter: true #是否启用prometheus exporter,默认false#exporter_addr: 9595 #prometheus exporter端口,默认9595#web admin相关配置enable_web_admin: true #是否启用web admin,默认falseweb_admin_port: 8060 #web监控端口,默认8060#cluster: # 集群相关配置  #name: myTransfer #集群名称,具有相同name的节点放入同一个集群  #bind_ip: 127.0.0.1 # 绑定的IP,如果机器有多张网卡(包含虚拟网卡)会有多个IP,使用这个属性绑定一个  #ZooKeeper地址,多个用逗号风格  #zk_addrs: 192.168.1.10:2181,192.168.1.11:2182,192.168.1.12:2183  #zk_authentication: 123456 #digest类型的访问秘钥,如:user:password,默认为空  #etcd_addrs: 127.0.0.1:2379 #etcd连接地址,多个用逗号分隔  #etcd_user: test #etcd用户名  #etcd_password: 123456 #etcd密码#目标类型target: kafka # 支持redis、mongodb、elasticsearch、rocketmq、kafka、rabbitmq#redis连接配置#redis_addrs: 127.0.0.1:6379 #redis地址,多个用逗号分隔#redis_group_type: cluster   # 集群类型 sentinel或者cluster#redis_master_name: mymaster # Master节点名称,如果group_type为sentinel则此项不能为空,为cluster此项无效#redis_pass: 123456 #redis密码#redis_database: 0  #redis数据库 0-16,默认0。如果group_type为cluster此项无效#mongodb连接配置#mongodb_addrs: 127.0.0.1:27017 #mongodb连接地址,多个用逗号分隔#mongodb_username: #mongodb用户名,默认为空#mongodb_password: #mongodb密码,默认为空#elasticsearch连接配置#es_addrs: 127.0.0.1:9200 #连接地址,多个用逗号分隔#es_version: 7 # Elasticsearch版本,支持6和7、默认为7#es_password:  # 用户名#es_version:  # 密码#rocketmq连接配置#rocketmq_name_servers: 127.0.0.1:9876 #rocketmq命名服务地址,多个用逗号分隔#rocketmq_group_name: transfer_test_group #rocketmq group name,默认为空#rocketmq_instance_name: transfer_test_group_ins #rocketmq instance name,默认为空#rocketmq_access_key: RocketMQ #访问控制 accessKey,默认为空#rocketmq_secret_key: 12345678 #访问控制 secretKey,默认为空#kafka连接配置kafka_addrs: 127.0.0.1:9092 #kafka连接地址,多个用逗号分隔#kafka_sasl_user:  #kafka SASL_PLAINTEXT认证模式 用户名#kafka_sasl_password: #kafka SASL_PLAINTEXT认证模式 密码#rabbitmq连接配置#rabbitmq_addr: amqp://guest:guest@127.0.0.1:5672/  #连接字符串,如: amqp://guest:guest@localhost:5672/#规则配置rule:  -    schema: test #数据库名称    table: score #表名称    #order_by_column: id #排序字段,存量数据同步时不能为空    #column_lower_case:false #列名称转为小写,默认为false    #column_upper_case:false#列名称转为大写,默认为false    column_underscore_to_camel: false #列名称下划线转驼峰,默认为false    # 包含的列,多值逗号分隔,如:id,name,age,area_id  为空时表示包含全部列    include_columns: ID,name,age,sex    #exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗号分隔,如:id,name,age,area_id  默认为空    #column_mappings: USER_NAME=account    #列名称映射,多个映射关系用逗号分隔,如:USER_NAME=account 表示将字段名USER_NAME映射为account    #default_column_values: area_name=合肥  #默认的列-值,多个用逗号分隔,如:source=binlog,area_name=合肥    #date_formatter: yyyy-MM-dd #date类型格式化, 不填写默认yyyy-MM-dd    #datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp类型格式化,不填写默认yyyy-MM-dd HH:mm:ss    lua_file_path: lua/sync.lua   #lua脚本文件,项目目录创建lua目录    #lua_script:   #lua 脚本    value_encoder: json  #值编码,支持json、kv-commas、v-commas;默认为json    #value_formatter: '{{.ID}}|{{.USER_NAME}}' # 值格式化表达式,如:{{.ID}}|{{.USER_NAME}},{{.ID}}表示ID字段的值、{{.USER_NAME}}表示USER_NAME字段的值    #redis相关    redis_structure: string # 数据类型。 支持string、hash、list、set、sortedset类型(与redis的数据类型一致)    #redis_key_prefix: USER_ #key的前缀    #redis_key_column: USER_NAME #使用哪个列的值作为key,不填写默认使用主键    #redis_key_formatter: '{{.ID}}|{{.USER_NAME}}'    #redis_key_value: user #KEY的值(固定值);当redis_structure为hash、list、set、sortedset此值不能为空    #redis_hash_field_prefix: _CARD_ #hash的field前缀,仅redis_structure为hash时起作用    #redis_hash_field_column: Cert_No #使用哪个列的值作为hash的field,仅redis_structure为hash时起作用,不填写默认使用主键    #redis_sorted_set_score_column: id #sortedset的score,当数据类型为sortedset时,此项不能为空,此项的值应为数字类型    #mongodb相关    #mongodb_database: transfer #mongodb database不能为空    #mongodb_collection: transfer_test_topic #mongodb collection,可以为空,默认使用表名称    #elasticsearch相关    #es_index: user_index #Index名称,可以为空,默认使用表(Table)名称    #es_mappings: #索引映射,可以为空,为空时根据数据类型自行推导ES推导    #  -    #    column: REMARK #数据库列名称    #    field: remark #映射后的ES字段名称    #    type: text #ES字段类型    #    analyzer: ik_smart #ES分词器,type为text此项有意义    #    #format: #日期格式,type为date此项有意义    #  -    #    column: USER_NAME #数据库列名称    #    field: account #映射后的ES字段名称    #    type: keyword #ES字段类型    #rocketmq相关    #rocketmq_topic: transfer_test_topic #rocketmq topic,可以为空,默认使用表名称    #kafka相关    kafka_topic: test #rocketmq topic,可以为空,默认使用表名称    #rabbitmq相关    #rabbitmq_queue: user_topic #queue名称,可以为空,默认使用表(Table)名称    #reserve_raw_data: true #保留update之前的数据,针对rocketmq、kafka、rabbitmq有用;默认为false

7.6 项目启动

1. 启动zk(zkServer.sh)2. 启动kafka (kafka-server-start.sh server.properties)3. 启动go-mysql-transfer (./go-mysql-transfer)4. 启动kafka消费者(kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic test)5. 编写简单的lua脚本,实现数据同步6. 验证数据同步

go-mysql-transfer/lua/sync.lua脚本内容

local json = require("json")   -- 加载json模块local ops = require("mqOps") --加载mq操作模块local os = require("os") --加载os模块local row = ops.rawRow()  --当前数据库的一行数据,local action = ops.rawAction()  --当前数据库事件,包括:insert、updare、deletelocal id = row["id"] --获取ID列的值local name = row["name"]local age = row["age"]local sex = row["sex"]local result = {}local data = {}result["timestamp"] = os.time()result["action"] = actiondata['id'] = iddata['name'] = namedata['age'] = agedata['sex'] = sexresult["object"] = datalocal val = json.encode(result) -- 将result转为jsonops.SEND("test", val) -- 发送消息,参数1:topic(string类型),参数2:消息内容

启动go-mysql-transfer
在这里插入图片描述
mysql更新数据
在这里插入图片描述

kafka收到的消息
在这里插入图片描述

  1. The Cluster ID i0yMUA_eRHuBS60eM1ph9w doesn’t match stored clusterId Some(aH
    https://blog.csdn.net/m0_59252007/article/details/119533700

1 https://www.kancloud.cn/wj596/go-mysql-transfer/2116628
2 https://www.cnblogs.com/Clera-tea/p/16517424.html

来源地址:https://blog.csdn.net/cjqh_hao/article/details/131352384

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-数据库
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯