文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

基于 Dinky + FlinkSQL + Flink CDC 同步 MySQL 数据到 Elasticsearch、Kafka

2023-09-27 05:46

关注

一、概述

Dinky 是一个开箱即用的一站式实时计算平台以 Apache Flink 为基础,连接 OLAP 和数据湖等众多框架致力于流批一体和湖仓一体的建设与实践。本文以此为FlinkSQL可视化工具。

Flink SQL 使得使用标准 SQL 开发流式应用变得简单,免去代码开发。

Flink CDC 本文使用 MySQL CDC 连接器 允许从 MySQL 数据库读取快照数据和增量数据。

环境及主要软件版本说明

二、软件安装部署

本文全部采用单机最简环境。

1.Flink

tar -xzf flink-1.14.6-bin-scala_2.12.tgzcd flink-1.14.6 && ls -l### 修改配置 flink-1.14.6/conf/flink-conf.yaml,修改成本机的IP、位置jobmanager.rpc.address: 192.168.xxx.xxxtaskmanager.host: 192.168.xxx.xxxjobmanager.memory.process.size: 2048mtaskmanager.memory.process.size: 2048mtaskmanager.numberOfTaskSlots: 10rest.port: 8888rest.address: 0.0.0.0web.tmpdir: /home/soft/flink_/flink-1.14.6/tmpakka.ask.timeout: 600s### 将flink-sql-connector-kafka_2.12-1.14.6.jar、flink-sql-connector-elasticsearch7_2.12-1.14.6.jar、### flink-sql-connector-mysql-cdc-2.3.0.jar、放到flink-1.14.6/lib/ 下### 启动./start-cluster.sh

启动成功后,即可访问Flink UI 可视化界面 http://192.168.xxx.xxx:8888/

2.Kafka

tar -xzf kafka_2.12-3.4.0.tgzcd kafka_2.12-3.4.0bin/kafka-storage.sh random-uuid > uuidvi config/kraft/server.propertiescontroller.quorum.voters=1@192.168.XXX.XXX:9093advertised.listeners=PLAINTEXT://192.168.XXX.XXX:9092log.dirs=/home/soft/kafka_2.12-3.4.0/kraft-combined-logs### 使用前面生成的uuid格式化Kafka日志目录bin/kafka-storage.sh format -t `cat uuid` -c config/kraft/server.propertiesFormatting /home/soft/kafka_2.12-3.4.0/kraft-combined-logs with metadata.version 3.4-IV0.### Start the Kafka Serverbin/kafka-server-start.sh -daemon config/kraft/server.properties### Kafka 服务器成功启动后,您将拥有一个正在运行并可以使用的基本 Kafka 环境。jps -l14085 kafka.Kafka#### 启动命令说明################################################################## 1、前台启动命令:### bin/kafka-server-start.sh config/server.properties### 描述:### 此种方式是窗口运行。一旦窗口关闭或者执行了CTR+C,那么kafka进程就被kill了,kafka服务端就被关闭了### 2、后台启动命令:### (1) nohup bin/kafka-server-start.sh config/server.properties 2>&1 &### (2)bin/kafka-server-start.sh -daemon config/server.properties

3.Dinky

部署参考:http://www.dlink.top//docs/next/deploy_guide/build

  1. 数据库初始化
#登录mysqlmysql -uroot -p123456#创建数据库mysql>create database dinky;#授权mysql>grant all privileges on dinky.* to 'dinky'@'%' identified by 'dinky@123' with grant option;mysql>flush privileges;#此处用 dinky 用户登录mysql -h fdw1 -udinky -pdinky@123mysql>use dinky;# sql文件在下载的压缩包中获取即可mysql> source /home/soft/dinky/sql/dinky.sql

Dinky部署

tar -zxvf dlink-release-0.7.3.tar.gzmv dlink-release-0.7.3.tar.gz dinky#切换目录,修改数据库配置及端口信息cd /home/soft/dinky/config/vi application.yml# -------------------------------------------------------------------------------spring:    datasource:        url: jdbc:mysql://${MYSQL_ADDR:192.168.XXX.XXX:3306}/${MYSQL_DATABASE:dinky}?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true        username: ${MYSQL_USERNAME:dinky}        password: ${MYSQL_PASSWORD:dinky@123}server:    port: 8899 # -------------------------------------------------------------------------------# 加载依赖Dinky 需要具备自身的 Flink 环境,该 Flink 环境的实现需要用户自己在 Dinky 根目录下# plugins/flink${FLINK_VERSION} 文件夹并上传相关的 Flink 依赖,如 flink-dist, flink-table 等    cp ${FLINK_HOME}/lib/flink-*.jar  /home/soft/dinky/plugins/flink1.14/# 包含如下的jarflink-csv-1.14.6.jarflink-dist_2.12-1.14.6.jarflink-json-1.14.6.jarflink-shaded-zookeeper-3.4.14.jarflink-sql-connector-elasticsearch7_2.12-1.14.6.jarflink-sql-connector-kafka_2.12-1.14.6.jarflink-sql-connector-mysql-cdc-2.3.0.jar# 完成后启动sh auto.sh start 1.14

启动完成后访问 http://192.168.xxx.xxx:8899/ 通过admin/admin即可登录进系统进行可视化操作。

三、任务创建提交

场景说明:目前 MySQL 中有四张表,t1、t2、t3、t4。需要将 t1、t2 关联查询后结果同步到 Elasticsearch 索引 search01_index,t3、t4 关联查询后结果也同步到Elasticsearch索引 search01_index。Elasticsearch 索引信息是通过Spring Elasticsearch Data 项目启动时自动生成好了。只需要将数据同步即可。另外推送 Kafka 供给其他源处理。

1. 通过 Flink 自带 sql-client.sh 方式

Flink SQL和数据库字段类型对照说明:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/jdbc/

MySQL 同步到 Elasticsearch

### 进入flink/bin 目录下,通过 ./sql-client.sh 命令启动sql客户端工具                 Flink SQL> SET execution.checkpointing.interval = 3s;### source 1Flink SQL> CREATE TABLE IF NOT EXISTS test1 (    id STRING,    name STRING,    del_flag TINYINT,    PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'mysql-cdc',    'hostname' = '192.168.XXX.XXX',    'port' = '3306',    'username' = 'root',    'password' = '123456',    'database-name' = 'test1的schame',    'table-name' = 'test1');### source 2Flink SQL> CREATE TABLE IF NOT EXISTS test2 (    id   STRING,    doc_id STRING,    event_detail STRING,    country STRING,    `time` STRING,    entity_type STRING,    entity_name STRING,    type TINYINT,    del_flag TINYINT,    PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'mysql-cdc',    'hostname' = '192.168.XXX.XXX',    'port' = '3306',    'username' = 'root',    'password' = '123456',    'database-name' = 'test2的schame',    'table-name' = 'test2');### source 3Flink SQL>CREATE TABLE IF NOT EXISTS test3 (    id STRING,    title STRING,    type TINYINT,    del_flag TINYINT,    PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'mysql-cdc',    'hostname' = '192.168.XXX.XXX',    'port' = '3306',    'username' = 'root',    'password' = '123456',    'database-name' = 'test3的schame',    'table-name' = 'test3');### source 4Flink SQL>CREATE TABLE IF NOT EXISTS test4 (     id STRING,     `time` STRING,     country STRING,     entity_type STRING,     entity_name STRING,     detail STRING,     fo_id STRING,     del_flag TINYINT,     PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'mysql-cdc',    'hostname' = '192.168.XXX.XXX',    'port' = '3306',    'username' = 'root',    'password' = '123456',    'database-name' = 'test4的schame',    'table-name' = 'test4');### sinkFlink SQL>CREATE TABLE IF NOT EXISTS search01_index (    id           STRING,    name         STRING,    eventDetail STRING,    country      STRING,    `time`       STRING,    entityType  STRING,    entityName  STRING,    type         TINYINT,    PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'elasticsearch-7',    'hosts' = 'http://192.168.XXX.XXX:9200',    'index' = 'search01_index',    'sink.bulk-flush.max-actions' = '1');### transform 1 ,防止进同一个索引时表 t1 和 t3 的 ID 相同,将 ID 进行拼接处理。Flink SQL>INSERT INTO search01_index    SELECT CONCAT('doc_', t1.id, '_', t2.id) AS id,    t1.name,    t2.event_detail                   AS eventDetail,    t2.country,    t2.`time`,    t2.entity_type                    AS entityType,    t2.entity_name                    AS entityName,    t2.type    FROM test1 t1    LEFT JOIN test2 t2 ON t1.id = t2.doc_id    WHERE t1.del_flag = 0    AND t2.del_flag = 0;    ### transform 2 ,防止进同一个索引时表 t1 和 t3 的 ID 相同,将 ID 进行拼接处理。Flink SQL>INSERT INTO search01_index    SELECT CONCAT('fo_', t1.id, '_', t2.id) AS id,    t3.title                         AS name,    t4.detail                        AS eventDetail,    t4.country,    t4.`time`,    t4.entity_type                   AS entityType,    t4.entity_name                   AS entityName,    t3.type    FROM test3 t3    LEFT JOIN test4 t4 ON t3.id = t4.fo_id    WHERE t3.del_flag = 0    AND t4.del_flag = 0;

执行完成后访问Flink UI http://192.168.xxx.xxx:8888/ 可以看到正在运行的
Running Jobs,然后对MySQL 表数据进行增删改操作,可以看到 Elasticsearch 中数据同步效果。

MySQL 同步到 Kafka

### 进入flink/bin 目录下,通过 ./sql-client.sh 命令启动sql客户端工具                 Flink SQL> SET execution.checkpointing.interval = 3s;### source 1Flink SQL> CREATE TABLE IF NOT EXISTS test1 (    id STRING,    name STRING,    del_flag TINYINT,    PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'mysql-cdc',    'hostname' = '192.168.XXX.XXX',    'port' = '3306',    'username' = 'root',    'password' = '123456',    'database-name' = 'test1的schame',    'table-name' = 'test1');### source 2Flink SQL> CREATE TABLE IF NOT EXISTS test2 (    id   STRING,    doc_id STRING,    event_detail STRING,    country STRING,    `time` STRING,    entity_type STRING,    entity_name STRING,    type TINYINT,    del_flag TINYINT,    PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'mysql-cdc',    'hostname' = '192.168.XXX.XXX',    'port' = '3306',    'username' = 'root',    'password' = '123456',    'database-name' = 'test2的schame',    'table-name' = 'test2');### source 3Flink SQL>CREATE TABLE IF NOT EXISTS test3 (    id STRING,    title STRING,    type TINYINT,    del_flag TINYINT,    PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'mysql-cdc',    'hostname' = '192.168.XXX.XXX',    'port' = '3306',    'username' = 'root',    'password' = '123456',    'database-name' = 'test3的schame',    'table-name' = 'test3');### source 4Flink SQL>CREATE TABLE IF NOT EXISTS test4 (     id STRING,     `time` STRING,     country STRING,     entity_type STRING,     entity_name STRING,     detail STRING,     fo_id STRING,     del_flag TINYINT,     PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'mysql-cdc',    'hostname' = '192.168.XXX.XXX',    'port' = '3306',    'username' = 'root',    'password' = '123456',    'database-name' = 'test4的schame',    'table-name' = 'test4');### sinkFlink SQL>CREATE TABLE IF NOT EXISTS test_kakfa (    id           STRING,    name         STRING,    eventDetail STRING,    country      STRING,    `time`       STRING,    entityType  STRING,    entityName  STRING,    type         TINYINT,    PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'kafka',    'topic' = 'test_kakfa',    'properties.bootstrap.servers' = '192.168.XXX.XXX:9092',    'properties.group.id' = 'testGroup',    'scan.startup.mode' = 'earliest-offset',    'format' = 'debezium-json',    'debezium-json.ignore-parse-errors'='true');### transform 1 ,防止进同一个索引时表 t1 和 t3 的 ID 相同,将 ID 进行拼接处理。Flink SQL>INSERT INTO test_kakfa    SELECT CONCAT('doc_', t1.id, '_', t2.id) AS id,    t1.name,    t2.event_detail                   AS eventDetail,    t2.country,    t2.`time`,    t2.entity_type                    AS entityType,    t2.entity_name                    AS entityName,    t2.type    FROM test1 t1    LEFT JOIN test2 t2 ON t1.id = t2.doc_id    WHERE t1.del_flag = 0    AND t2.del_flag = 0;    ### transform 2 ,防止进同一个索引时表 t1 和 t3 的 ID 相同,将 ID 进行拼接处理。Flink SQL>INSERT INTO test_kakfa    SELECT CONCAT('fo_', t1.id, '_', t2.id) AS id,    t3.title                         AS name,    t4.detail                        AS eventDetail,    t4.country,    t4.`time`,    t4.entity_type                   AS entityType,    t4.entity_name                   AS entityName,    t3.type    FROM test3 t3    LEFT JOIN test4 t4 ON t3.id = t4.fo_id    WHERE t3.del_flag = 0    AND t4.del_flag = 0;

执行完成后访问 Kafka-map UI 可以看到数据已经进入Kafka,数据格式为:

{"before":null,"after":{"id":"fo_48_50","name":"啊","eventDetail":"内容2","country":"日本","time":"2023-06-01","entityType":"实体2","entityName":"名称2","type":3},"op":"c"}

2. 通过 Dinky 方式

d1

d2.png

    CREATE TABLE IF NOT EXISTS test1 (        id STRING,        name STRING,        del_flag TINYINT,        PRIMARY KEY (id) NOT ENFORCED    ) WITH (        'connector' = 'mysql-cdc',        'hostname' = '192.168.XXX.XXX',        'port' = '3306',        'username' = 'root',        'password' = '123456',        'database-name' = 'test1的schame',        'table-name' = 'test1'    );     CREATE TABLE IF NOT EXISTS test2 (        id   STRING,        doc_id STRING,        event_detail STRING,        country STRING,        `time` STRING,        entity_type STRING,        entity_name STRING,        type TINYINT,        del_flag TINYINT,        PRIMARY KEY (id) NOT ENFORCED    ) WITH (        'connector' = 'mysql-cdc',        'hostname' = '192.168.XXX.XXX',        'port' = '3306',        'username' = 'root',        'password' = '123456',        'database-name' = 'test2的schame',        'table-name' = 'test2'    );        CREATE TABLE IF NOT EXISTS test_kakfa (        id           STRING,        name         STRING,        eventDetail STRING,        country      STRING,        `time`       STRING,        entityType  STRING,        entityName  STRING,        type         TINYINT,        PRIMARY KEY (id) NOT ENFORCED    ) WITH (        'connector' = 'kafka',        'topic' = 'test_kakfa',        'properties.bootstrap.servers' = '192.168.XXX.XXX:9092',        'properties.group.id' = 'testGroup',        'scan.startup.mode' = 'earliest-offset',        'format' = 'debezium-json',        'debezium-json.ignore-parse-errors'='true'    );    INSERT INTO test_kakfa    SELECT CONCAT('doc_', t1.id, '_', t2.id) AS id,        t1.name,        t2.event_detail                   AS eventDetail,        t2.country,        t2.`time`,        t2.entity_type                    AS entityType,        t2.entity_name                    AS entityName,        t2.type    FROM test1 t1    LEFT JOIN test2 t2 ON t1.id = t2.doc_id    WHERE t1.del_flag = 0    AND t2.del_flag = 0;
    CREATE TABLE IF NOT EXISTS test3 (        id STRING,        title STRING,        type TINYINT,        del_flag TINYINT,        PRIMARY KEY (id) NOT ENFORCED    ) WITH (        'connector' = 'mysql-cdc',        'hostname' = '192.168.XXX.XXX',        'port' = '3306',        'username' = 'root',        'password' = '123456',        'database-name' = 'test3的schame',        'table-name' = 'test3'    );    CREATE TABLE IF NOT EXISTS test4 (         id STRING,         `time` STRING,         country STRING,         entity_type STRING,         entity_name STRING,         detail STRING,         fo_id STRING,         del_flag TINYINT,         PRIMARY KEY (id) NOT ENFORCED    ) WITH (        'connector' = 'mysql-cdc',        'hostname' = '192.168.XXX.XXX',        'port' = '3306',        'username' = 'root',        'password' = '123456',        'database-name' = 'test4的schame',        'table-name' = 'test4'    );    CREATE TABLE IF NOT EXISTS test_kakfa (        id           STRING,        name         STRING,        eventDetail STRING,        country      STRING,        `time`       STRING,        entityType  STRING,        entityName  STRING,        type         TINYINT,        PRIMARY KEY (id) NOT ENFORCED    ) WITH (        'connector' = 'kafka',        'topic' = 'test_kakfa',        'properties.bootstrap.servers' = '192.168.6.36:9092',        'properties.group.id' = 'testGroup',        'scan.startup.mode' = 'earliest-offset',        'format' = 'debezium-json',        'debezium-json.ignore-parse-errors'='true'    );    INSERT INTO test_kakfa    SELECT CONCAT('fo_', t1.id, '_', t2.id) AS id,        t3.title                         AS name,        t4.detail                        AS eventDetail,        t4.country,        t4.`time`,        t4.entity_type                   AS entityType,        t4.entity_name                   AS entityName,        t3.type    FROM test3 t3    LEFT JOIN test4 t4 ON t3.id = t4.fo_id    WHERE t3.del_flag = 0    AND t4.del_flag = 0;
    CREATE TABLE IF NOT EXISTS test1 (        id STRING,        name STRING,        del_flag TINYINT,        PRIMARY KEY (id) NOT ENFORCED    ) WITH (        'connector' = 'mysql-cdc',        'hostname' = '192.168.XXX.XXX',        'port' = '3306',        'username' = 'root',        'password' = '123456',        'database-name' = 'test1的schame',        'table-name' = 'test1'    );     CREATE TABLE IF NOT EXISTS test2 (        id   STRING,        doc_id STRING,        event_detail STRING,        country STRING,        `time` STRING,        entity_type STRING,        entity_name STRING,        type TINYINT,        del_flag TINYINT,        PRIMARY KEY (id) NOT ENFORCED    ) WITH (        'connector' = 'mysql-cdc',        'hostname' = '192.168.XXX.XXX',        'port' = '3306',        'username' = 'root',        'password' = '123456',        'database-name' = 'test2的schame',        'table-name' = 'test2'    );    CREATE TABLE IF NOT EXISTS search01_index (        id           STRING,        name         STRING,        eventDetail STRING,        country      STRING,        `time`       STRING,        entityType  STRING,        entityName  STRING,        type         TINYINT,        PRIMARY KEY (id) NOT ENFORCED    ) WITH (        'connector' = 'elasticsearch-7',        'hosts' = 'http://192.168.XXX.XXX:9200',        'index' = 'search01_index',        'sink.bulk-flush.max-actions' = '1'    );    INSERT INTO search01_index    SELECT CONCAT('doc_', t1.id, '_', t2.id) AS id,        t1.name,        t2.event_detail                   AS eventDetail,        t2.country,        t2.`time`,        t2.entity_type                    AS entityType,        t2.entity_name                    AS entityName,        t2.type    FROM test1 t1    LEFT JOIN test2 t2 ON t1.id = t2.doc_id    WHERE t1.del_flag = 0    AND t2.del_flag = 0;
    CREATE TABLE IF NOT EXISTS test3 (        id STRING,        title STRING,        type TINYINT,        del_flag TINYINT,        PRIMARY KEY (id) NOT ENFORCED    ) WITH (        'connector' = 'mysql-cdc',        'hostname' = '192.168.XXX.XXX',        'port' = '3306',        'username' = 'root',        'password' = '123456',        'database-name' = 'test3的schame',        'table-name' = 'test3'    );    CREATE TABLE IF NOT EXISTS test4 (         id STRING,         `time` STRING,         country STRING,         entity_type STRING,         entity_name STRING,         detail STRING,         fo_id STRING,         del_flag TINYINT,         PRIMARY KEY (id) NOT ENFORCED    ) WITH (        'connector' = 'mysql-cdc',        'hostname' = '192.168.XXX.XXX',        'port' = '3306',        'username' = 'root',        'password' = '123456',        'database-name' = 'test4的schame',        'table-name' = 'test4'    );    CREATE TABLE IF NOT EXISTS search01_index (        id           STRING,        name         STRING,        eventDetail STRING,        country      STRING,        `time`       STRING,        entityType  STRING,        entityName  STRING,        type         TINYINT,        PRIMARY KEY (id) NOT ENFORCED    ) WITH (        'connector' = 'elasticsearch-7',        'hosts' = 'http://192.168.XXX.XXX:9200',        'index' = 'search01_index',        'sink.bulk-flush.max-actions' = '1'    );    INSERT INTO search01_index    SELECT CONCAT('fo_', t1.id, '_', t2.id) AS id,        t3.title                         AS name,        t4.detail                        AS eventDetail,        t4.country,        t4.`time`,        t4.entity_type                   AS entityType,        t4.entity_name                   AS entityName,        t3.type    FROM test3 t3    LEFT JOIN test4 t4 ON t3.id = t4.fo_id    WHERE t3.del_flag = 0    AND t4.del_flag = 0;

f1.png

f2.png

在这里插入图片描述

e1.png

至此就大功告成了,对于Dinky还不是很熟悉,不知是否还有最优解。欢迎交流。

来源地址:https://blog.csdn.net/MHXSH_1_0/article/details/131072318

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-数据库
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯