文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

flink Mysql CDC(动态加表)、postgresqlCDC 和 CDC无锁算法

2023-10-12 12:51

关注

flinkCDC - 功能验证记录

flink 与cdc 版本使用搭配:

flink1.13.6 + flink mysql cdc 1.4.0
flink 1.16.0 + flink mysql cdc 2.3.0
flink 1.16.0 + flink mysql cdc 2.4.0
flink 1.16.0 + flink postgresql cdc 2.3.0

flink 1.13.6 + flink mysql cdc 2.3.0 : 没有报错,没有数据,估计是兼容有问题

flink cdc

参数说明

调整chunck大小 : scan.incremental.snapshot.chunk.size
2、设置cdc模式:scan.startup.mode【initial(默认)、latest-offset】
3、支持chunk key 列设置,默认是第一个字段:scan.incremental.snapshot.chunk.key-column
官网:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html

原理分析

cdc mysql 全量快照阶段split sql :SELECT * FROM cdc_db.tablename WHERE id >= ? AND NOT (id = ?) AND id <= ?;
备注:id 是主键id

(DBLog)无锁算法论文

链接地址:https://arxiv.org/pdf/2010.12597.pdf , 对此算法感兴趣的可以看这位大佬的分享:https://zhuanlan.zhihu.com/p/600303844

论文部分摘要理解:

mysql cdc

cdc api 动态加表

启动任务,复制checkpoint路径
在这里插入图片描述2、新增监听的表到tableList(可以使用同一个jar包,在外部传参动态加表)
3、从checkpoint初重启任务即可

flink cdc sql 性能压测

cdc mysql sink to kafka :一个takmanager , 4个slot , source 并发度4,sink kafka 并发度1 ,最高写入2.8W条/s

flink cdc api 性能压测

cdc mysql sink to kafka :一个takmanager , 4个slot , source 并发度4,sink kafka 并发度1 ,最高写入2.8W条/s

PostgreSqlCDC

执行更新语句,会出现 2 种情况

若更新字段包含(部分)主键字段,会先发送一条删除之前主键的记录。op = d , after = null ; 然后再发送一条新主键记录,op = c,且before = null 。
2、若仅更新非主键主键,只会发送一条记录,op = u , before = null。

主体代码如下:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        // 必须开启 checkpoint ,因为Flink Postgres CDC 只会在 checkpoint 完成的时候更新 Postgres slot 中的 LSN,否则磁盘使用率会一直很高        env.enableCheckpointing(1000);        //监听 postgresql wal 日志        DebeziumSourceFunction<String> sourceFunction = PostgreSQLSource.<String>builder()                .hostname(host)                .port(port)                .username(userName)                .password(passWord)                .database(dbName)                .tableList(tableList)                .deserializer(new JsonDebeziumDeserializationSchema())                .slotName(slotName)                .build();        DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);        dataStreamSource.print(">>>").setParallelism(1);        env.execute();

cdc sink to kafka

AT_LEAST_ONCE 模型要配置 acks = 1

报错

mysql时区错误,The server time zone value ‘EDT’ is unrecognized or represents

登录mysql并查询当前时区:show variables like “%time_zone%”;
执行以下命令修改时区:

set global time_zone = '+8:00'; ##修改mysql全局时区为北京时间,即我们所在的东8区set time_zone = '+8:00'; ##修改当前会话时区flush privileges; #立即生效

java.lang.NoClassDefFoundError: io/debezium/connector/mysql/MySqlConnectorConfig

缺包,引入 debezium-connector-mysql-1.6.4.Final.jar包会报Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig,看 flink cdc 社区群反馈可能是

Cannot discover a connector using option: ‘connector’=‘mysql-cdc’

删除pom.xml文件flink-connector-mysql-cdc依赖报下的provided

Could not instantiate the executor. Make sure a planner module is on the classpath

包冲突原因。注释或删除jar:flink-table-planner-loader-1.16.0.jar

(source 算子 )The TaskExecutor is shutting down.

加大心跳间隔时间,默认是30s,‘heartbeat.interval’ = ‘60s’

来源地址:https://blog.csdn.net/a123147abc/article/details/131301937

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-数据库
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯