1. 简介
Flink CDC(Flink Change Data Capture)是基于数据库的日志CDC技术,实现了全增量一体化读取的数据集成框架。它搭配Flink计算框架,能够高效实现海量数据的实时集成。Flink CDC的核心功能在于实时地监视数据库或数据流中发生的数据变动,并将这些变动抽取出来,以便进一步的处理和分析。通过使用Flink CDC,用户可以轻松地构建实时数据管道,对数据变动进行实时响应和处理,为实时分析、实时报表和实时决策等场景提供强大的支持。
具体来说,Flink CDC的应用场景包括但不限于实时数据仓库更新、实时数据同步和迁移、实时数据处理等。它还可以确保数据一致性,并在数据发生变更时能够准确地捕获和处理。此外,Flink CDC支持与多种数据源进行集成,如MySQL、PostgreSQL、Oracle等,并提供了相应的连接器,方便数据的捕获和处理。
接下来将详细的介绍关于MySQL CDC的使用。MySQL CDC 连接器允许从 MySQL 数据库读取快照数据和增量数据。
支持的数据库
Connector | Database | Driver |
mysql-cdc |
| JDBC Driver 8.0.27 |
2. 实战案例
2.1 MySQL开启Binlog
在MySQL的配置文件中(如Linux的/etc/my.cnf或Windows的\my.ini),需要在[mysqld]部分设置相关参数以开启binlog功能,如下:
[mysqld]
server-id=1
# 格式,行级格式
binlog-format=Row
# binlog 日志文件的前缀
log-bin=mysql-bin
# 指定哪些数据库需要记录二进制日志
binlog_do_db=testjpa
除了开启binlog功能外,Flink CDC还需要其他配置和权限来确保能够正常连接到MySQL并读取数据。例如,需要授予Flink CDC连接MySQL的用户必要的权限,包括SELECT、REPLICATION SLAVE、REPLICATION CLIENT、SHOW VIEW等。这些权限是Flink CDC读取数据和元数据所必需的。
查看是否开启了binlog功能
mysql> SHOW VARIABLES LIKE 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
以上就对mysql相关的配置完成了。
2.2 依赖管理
1.19.0
org.apache.flink
flink-connector-base
${flink.version}
com.ververica
flink-sql-connector-mysql-cdc
3.0.1
org.apache.flink
flink-streaming-java
${flink.version}
org.apache.flink
flink-clients
${flink.version}
org.apache.flink
flink-table-runtime
${flink.version}
2.3 代码实现
@Component
public class MonitorMySQLCDC implements InitializingBean {
// 该队列专门用来临时保存变化的数据(实际生产环境,你应该使用MQ相关的产品)
public static final LinkedBlockingQueue
启动模式:
- initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。
- earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取
- latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
- specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。
- timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。
数据处理Sink
@Component
public class CustomSink extends RichSinkFunction {
private ObjectMapper mapper = new ObjectMapper();
@Override
public void invoke(String value, Context context) throws Exception {
System.out.printf("数据发生变化: %s%n", value);
TypeReference
以上就是实现通过FlinkCDC实时通过数据到Redis的所有代码。
2.4 Web监控页面
引入flink web依赖
org.apache.flink
flink-runtime-web
${flink.version}
环境配置
Configuration config = new Configuration() ;
config.set(RestOptions.PORT, 9090) ;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config) ;
web监听9090端口。
图片
通过web控制台你可以管理查看到更多的信息。