文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

flink学习33:flinkSQL连接mysql,查询插入数据

2023-09-07 10:15

关注

生成运行时env

生成表环境

接上数据流,数据流数据生成表

把数据库中sink保存数据的表,在flink中生成一遍(相当于把flink生成的表,绑定到数据库中的表),配上数据库连接信息,并执行,及注册

查询表,可以根据注册表名查询

插入表,可以根据生成的flink表进行数据插入

import org.apache.flink.streaming.api.scala._import org.apache.flink.table.api.bridge.scala._import org.apache.flink.table.api._import org.apache.flink.table.api.{DataTypes, Table}import org.apache.flink.table.descriptors._object SqlReadMysql {  def main(args: Array[String]): Unit = {    // creat env    val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment    //parallelism    bsEnv.setParallelism(1)    //set env    val bsSetting = EnvironmentSettings      .newInstance()      .useBlinkPlanner()      .inStreamingMode()      .build()    //create table env    val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSetting)    //create ds    val dataStream = bsEnv.fromElements(Tuple2("01","lisi" ))    val table1 = bsTableEnv.fromDataStream(dataStream)    //create table    val sinkDDL =      """        |create table student2_flink (        |code varchar(20) null,        |name varchar(20) null        |)with(        |'connector.type'='jdbc',        |'connector.url'='jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC',        |'connector.table'='student2',        |'connector.driver'='com.mysql.jdbc.Driver',        |'connector.username'='root',        |'connector.password'='root'        |)        |""".stripMargin    println(sinkDDL)    // execute the create table sql    bsTableEnv.executeSql(sinkDDL)    //register table    val myStudent = bsTableEnv.from("student2_flink")    //execute query    val result = bsTableEnv.sqlQuery(s"select * from $myStudent")    result.toRetractStream[(String, String)].print()    //insert data    table1.executeInsert("student2_flink")    //execute    bsEnv.execute()  }}
    4.0.0    org.sinopharm.gksk    gksk-bigdata    1.0-SNAPSHOT    gksk-bigdata        http://www.example.com            UTF-8        1.8        1.8        1.2.17        1.7.25        1.7.25                                    org.apache.flink            flink-java            1.14.4                            org.apache.flink            flink-streaming-java_2.12            1.14.4                                    org.apache.flink            flink-scala_2.12            1.14.4                            org.apache.flink            flink-streaming-scala_2.12            1.14.4                                    org.apache.flink            flink-table-api-scala-bridge_2.12            1.14.4                            org.apache.flink            flink-table-planner_2.12            1.14.4                            org.apache.flink            flink-table-planner-blink_2.12            1.12.0            provided                            org.apache.flink            flink-table-common            1.14.4                            org.apache.flink            flink-csv            1.14.4                                    org.apache.flink            flink-connector-kafka_2.12            1.14.4                                    org.apache.flink            flink-statebackend-rocksdb_2.12            1.14.4                            org.apache.flink            flink-connector-jdbc_2.12            1.14.4                            mysql            mysql-connector-java            8.0.16                                    org.apache.flink            flink-clients_2.12            1.14.4                                    org.apache.flink            flink-runtime-web_2.12            1.14.4            runtime                                    junit            junit            4.11            test                                    org.slf4j            slf4j-log4j12            ${slf4j.version}            runtime                            log4j            log4j            ${log4j.version}            runtime                                                    src/main/java                                        src/main/scala                                                                                org.apache.maven.plugins                maven-compiler-plugin                3.1                                    1.8                    1.8                                                                    net.alchim31.maven                scala-maven-plugin                3.2.2                                                            compiletestCompile                                                                                    

原因:pom文件中缺少 planner

解决办法:添加

    org.apache.flink    flink-table-planner_2.12    1.14.4

ps:注意有时候 配置两个planner也会报错

原因:缺少mysql的jar包

解决:pom文件添加:

    mysql    mysql-connector-java    8.0.16

原因:URL没有指定时区,jdbc 6.0以上都有这个问题

解决:在URL后边加时区

'connector.url'='jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC',


 

原因:连接的URL写错了

解决:好好看看,字符 、格式

来源地址:https://blog.csdn.net/hzp666/article/details/128796575

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-数据库
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯