文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

SpringBoot整合Flink CDC,实时追踪数据变动,无缝同步至Redis

2024-11-29 23:25

关注

1. 简介

Flink CDC(Flink Change Data Capture)是基于数据库的日志CDC技术,实现了全增量一体化读取的数据集成框架。它搭配Flink计算框架,能够高效实现海量数据的实时集成。Flink CDC的核心功能在于实时地监视数据库或数据流中发生的数据变动,并将这些变动抽取出来,以便进一步的处理和分析。通过使用Flink CDC,用户可以轻松地构建实时数据管道,对数据变动进行实时响应和处理,为实时分析、实时报表和实时决策等场景提供强大的支持。

具体来说,Flink CDC的应用场景包括但不限于实时数据仓库更新、实时数据同步和迁移、实时数据处理等。它还可以确保数据一致性,并在数据发生变更时能够准确地捕获和处理。此外,Flink CDC支持与多种数据源进行集成,如MySQL、PostgreSQL、Oracle等,并提供了相应的连接器,方便数据的捕获和处理。

接下来将详细的介绍关于MySQL CDC的使用。MySQL CDC 连接器允许从 MySQL 数据库读取快照数据和增量数据。

支持的数据库

Connector

Database

Driver

mysql-cdc

  • MySQL:5.6,5.7,8.0.x
  • RDS MYSQL: 5.6,5.7,8.0.x
  • PolarDB MySQL: 5.6,5.7,8.0.x
  • Aurora MySQL 5.6,5.7,8.0.x
  • MariaDB: 10.x
  • PolarDB X: 2.0.1

JDBC Driver 8.0.27

2. 实战案例

2.1 MySQL开启Binlog

在MySQL的配置文件中(如Linux的/etc/my.cnf或Windows的\my.ini),需要在[mysqld]部分设置相关参数以开启binlog功能,如下:

[mysqld]
server-id=1
# 格式,行级格式
binlog-format=Row
# binlog 日志文件的前缀
log-bin=mysql-bin
# 指定哪些数据库需要记录二进制日志
binlog_do_db=testjpa

除了开启binlog功能外,Flink CDC还需要其他配置和权限来确保能够正常连接到MySQL并读取数据。例如,需要授予Flink CDC连接MySQL的用户必要的权限,包括SELECT、REPLICATION SLAVE、REPLICATION CLIENT、SHOW VIEW等。这些权限是Flink CDC读取数据和元数据所必需的。

查看是否开启了binlog功能

mysql> SHOW VARIABLES LIKE 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+

以上就对mysql相关的配置完成了。

2.2 依赖管理


  1.19.0


  org.apache.flink
  flink-connector-base
  ${flink.version}


  com.ververica
  flink-sql-connector-mysql-cdc
  3.0.1


  org.apache.flink
  flink-streaming-java
  ${flink.version}


  org.apache.flink
  flink-clients
  ${flink.version}


  org.apache.flink
  flink-table-runtime
  ${flink.version}

2.3 代码实现

@Component
public class MonitorMySQLCDC implements InitializingBean {


  // 该队列专门用来临时保存变化的数据(实际生产环境,你应该使用MQ相关的产品)
  public static final LinkedBlockingQueue> queue = new LinkedBlockingQueue<>() ;
  
  private final StringRedisTemplate stringRedisTemplate ;
  // 保存到redis中key的前缀
  private final String PREFIX = "users:" ;
  // 数据发生变化后的sink处理
  private final CustomSink customSink ;
  public MonitorMySQLCDC(CustomSink customSink, StringRedisTemplate stringRedisTemplate) {
    this.customSink = customSink ;
    this.stringRedisTemplate = stringRedisTemplate ;
  }
  
  @Override
  public void afterPropertiesSet() throws Exception {
    // 启动异步线程,实时处理队列中的数据
    new Thread(() -> {
      while(true) {
        try {
          Map result = queue.take();
          this.doAction(result) ;
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }).start() ;
    Properties jdbcProperties = new Properties() ;
    jdbcProperties.setProperty("useSSL", "false") ;
    MySqlSource source = MySqlSource.builder()
        .hostname("127.0.0.1")
        .port(3306)
        // 可配置多个数据库
        .databaseList("testjpa")
        // 可配置多个表
        .tableList("testjpa.users")
        .username("root")
        .password("123123")
        .jdbcProperties(jdbcProperties)
        // 包括schema的改变
        .includeSchemaChanges(true)
        // 反序列化设置
        // .deserializer(new StringDebeziumDeserializationSchema())
        .deserializer(new JsonDebeziumDeserializationSchema(true))
        // 启动模式;关于启动模式下面详细介绍
        .startupOptions(StartupOptions.initial())
        .build() ;
    // 环境配置
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;
    // 设置 6s 的 checkpoint 间隔
    env.enableCheckpointing(6000) ;
    // 设置 source 节点的并行度为 4
    env.setParallelism(4) ;
    env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL")
        // 添加Sink
        .addSink(this.customSink) ;
    env.execute() ;
  }
  
  @SuppressWarnings("unchecked")
  private void doAction(Map result) throws Exception {
    Map payload = (Map) result.get("payload") ;
    String op = (String) payload.get("op") ;
    switch (op) {
      // 更新和插入操作
      case "u", "c" -> {
        Map after = (Map) payload.get("after") ;
        String id = after.get("id").toString();
        System.out.printf("操作:%s, ID: %s%n", op, id) ;
        stringRedisTemplate.opsForValue().set(PREFIX + id, new ObjectMapper().writeValueAsString(after)) ;
      }
      // 删除操作
      case "d" -> {
        Map after = (Map) payload.get("before") ;
        String id = after.get("id").toString();
        stringRedisTemplate.delete(PREFIX + id) ;
      } 
    }
  }
  
}

启动模式:

数据处理Sink

@Component
public class CustomSink extends RichSinkFunction {


  private ObjectMapper mapper = new ObjectMapper();


  @Override
  public void invoke(String value, Context context) throws Exception {
    System.out.printf("数据发生变化: %s%n", value);
    TypeReference> valueType = new TypeReference>() {
    };
    Map result = mapper.readValue(value, valueType);
    Map payload = (Map) result.get("payload");
    String op = (String) payload.get("op") ;
    // 不对读操作处理
    if (!"r".equals(op)) {
      MonitorMySQLCDC.queue.put(result);
    }
  }
}

以上就是实现通过FlinkCDC实时通过数据到Redis的所有代码。

2.4 Web监控页面

引入flink web依赖


  org.apache.flink
  flink-runtime-web
  ${flink.version}

环境配置

Configuration config = new Configuration() ;
config.set(RestOptions.PORT, 9090) ;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config) ;

web监听9090端口。

图片

通过web控制台你可以管理查看到更多的信息。

来源:Spring全家桶实战案例源码内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯