文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表

2023-08-30 14:52

关注

使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表

package flink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.TableResult;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkSQL_CDC {    public static void main(String[] args) throws Exception {////        Configuration conf = new Configuration();//        conf.setInteger("rest.port",3335);//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);        //1.创建执行环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);        //2.创建Flink-MySQL-CDC的Source        TableResult tableResult = tableEnv.executeSql("CREATE TABLE table_name (" +                "  id INT primary key," +                "  name STRING" +                ") WITH (" +                "  'connector' = 'mysql-cdc'," +                "  'hostname' = 'hadoop102'," +                "  'port' = '3306'," +                "  'username' = 'root'," +                "  'password' = 'xxxx'," +                "  'database-name' = 'student'," +                "  'table-name' = 'table_name'," +                "'server-time-zone' = 'Asia/Shanghai'," +                "'scan.startup.mode' = 'initial'" +                ")"        );        // 2. 注册SinkTable: sink_sensor//        tableEnv.executeSql("" +//                "CREATE TABLE kafka_binlog ( " +//                "  user_id INT, " +//                "  user_name STRING, " +//                "`proc_time` as PROCTIME()" +//                ") WITH ( " +//                "  'connector' = 'kafka', " +//                "  'topic' = 'test2', " +//                "  'properties.bootstrap.servers' = 'hadoop102:9092', " +//                "  'format' = 'json' " +//                ")" +//                "");        //upsert-kafka        tableEnv.executeSql("" +                "CREATE TABLE kafka_binlog ( " +                "  user_id INT, " +                "  user_name STRING, " +                "`proc_time` as PROCTIME()," +                "  PRIMARY KEY (user_id) NOT ENFORCED" +                ") WITH ( " +                "  'connector' = 'upsert-kafka', " +                "  'topic' = 'test2', " +                "  'properties.bootstrap.servers' = 'hadoop102:9092', " +                "  'key.format' = 'json' ," +                "  'value.format' = 'json' " +                ")" +                "");        // 3. 从SourceTable 查询数据, 并写入到 SinkTable         tableEnv.executeSql("insert into kafka_binlog select * from table_name");         tableEnv.executeSql("select * from kafka_binlog").print();        env.execute();    }}

来源地址:https://blog.csdn.net/m0_37759590/article/details/132558090

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-数据库
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯