总览
生成运行时env
生成表环境
接上数据流,数据流数据生成表
把数据库中sink保存数据的表,在flink中生成一遍(相当于把flink生成的表,绑定到数据库中的表),配上数据库连接信息,并执行,及注册
查询表,可以根据注册表名查询
插入表,可以根据生成的flink表进行数据插入
完整案例:
import org.apache.flink.streaming.api.scala._import org.apache.flink.table.api.bridge.scala._import org.apache.flink.table.api._import org.apache.flink.table.api.{DataTypes, Table}import org.apache.flink.table.descriptors._object SqlReadMysql { def main(args: Array[String]): Unit = { // creat env val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment //parallelism bsEnv.setParallelism(1) //set env val bsSetting = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode() .build() //create table env val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSetting) //create ds val dataStream = bsEnv.fromElements(Tuple2("01","lisi" )) val table1 = bsTableEnv.fromDataStream(dataStream) //create table val sinkDDL = """ |create table student2_flink ( |code varchar(20) null, |name varchar(20) null |)with( |'connector.type'='jdbc', |'connector.url'='jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC', |'connector.table'='student2', |'connector.driver'='com.mysql.jdbc.Driver', |'connector.username'='root', |'connector.password'='root' |) |""".stripMargin println(sinkDDL) // execute the create table sql bsTableEnv.executeSql(sinkDDL) //register table val myStudent = bsTableEnv.from("student2_flink") //execute query val result = bsTableEnv.sqlQuery(s"select * from $myStudent") result.toRetractStream[(String, String)].print() //insert data table1.executeInsert("student2_flink") //execute bsEnv.execute() }}
POM文件:
4.0.0 org.sinopharm.gksk gksk-bigdata 1.0-SNAPSHOT gksk-bigdata http://www.example.com UTF-8 1.8 1.8 1.2.17 1.7.25 1.7.25 org.apache.flink flink-java 1.14.4 org.apache.flink flink-streaming-java_2.12 1.14.4 org.apache.flink flink-scala_2.12 1.14.4 org.apache.flink flink-streaming-scala_2.12 1.14.4 org.apache.flink flink-table-api-scala-bridge_2.12 1.14.4 org.apache.flink flink-table-planner_2.12 1.14.4 org.apache.flink flink-table-planner-blink_2.12 1.12.0 provided org.apache.flink flink-table-common 1.14.4 org.apache.flink flink-csv 1.14.4 org.apache.flink flink-connector-kafka_2.12 1.14.4 org.apache.flink flink-statebackend-rocksdb_2.12 1.14.4 org.apache.flink flink-connector-jdbc_2.12 1.14.4 mysql mysql-connector-java 8.0.16 org.apache.flink flink-clients_2.12 1.14.4 org.apache.flink flink-runtime-web_2.12 1.14.4 runtime junit junit 4.11 test org.slf4j slf4j-log4j12 ${slf4j.version} runtime log4j log4j ${log4j.version} runtime src/main/java src/main/scala org.apache.maven.plugins maven-compiler-plugin 3.1 1.8 net.alchim31.maven scala-maven-plugin 3.2.2 compile testCompile
Could not instantiate the executor. Make sure a planner module is on the classpath
原因:pom文件中缺少 planner
解决办法:添加
org.apache.flink flink-table-planner_2.12 1.14.4
ps:注意有时候 配置两个planner也会报错
flinksql 连接mysql报错 JDBC-Class not found. - com.mysql.jdbc.Driver
原因:缺少mysql的jar包
解决:pom文件添加:
mysql mysql-connector-java 8.0.16
open() failed.The server time zone value '�й���ʱ��' is unrecognized or represents more than one time zone. You must configure either the server or JDBC driver (via the serverTimezone configuration property) to use a more specifc time zone value if you want to utilize time zone support.
原因:URL没有指定时区,jdbc 6.0以上都有这个问题
解决:在URL后边加时区
'connector.url'='jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC',
- useUnicode=true 表示使用Unicode字符,因此可以使用中文
- characterEncoding=utf8 设置编码方式
- useSSL=true 设置安全连接
- serverTimezone=UTC 设置全球标准时间
open() failed.Cannot load connection class because of underlying exception: com.mysql.cj.exceptions.WrongArgumentException: Malformed database URL, failed to parse the main URL sections.
原因:连接的URL写错了
解决:好好看看,字符 、格式