环境
Flink 1.15.4
实现原因
目前有很多工具都支持无代码实现Mysql -> Doris 的实时同步
不过好多要么不支持表结构变动,要不不支持多sink,我们的业务必须支持对表结构的实时级变动,因为会对表字段级别的修改,字段类型更改,字段名字更改删除添加等
所以要支持整库同步且又要表结构的实时变动就要自己写
所需jar
flink-doris-connector-1.15-1.4.0.jar -- 实现一键万表同步
flink-sql-connector-mysql-cdc-2.4.0.jar --包含所有相关依赖,无需在导入debezium、cdc等等
流程
1、脚本创建库表
2、同步表结构程序
3、Flink cdc 程序
对比第一版本:使用 Flink CDC 实现 MySQL 数据,表结构实时入 Apache Doris 效率有所提升
首次同步时keyby 后开窗聚合导致数据倾斜
聚合数据有字符串拼接改为JsonArray 避免聚合导致背压,字符串在数据量较大时拼接效率太低
Flink cdc 代码
1、FlinkSingleSync.scala
package com.zbkj.syncimport com.alibaba.fastjson2.{JSON, JSONObject,JSONArray}import com.ververica.cdc.connectors.mysql.source.MySqlSourceimport com.ververica.cdc.connectors.mysql.table.StartupOptionsimport com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfigimport com.ververica.cdc.debezium.JsonDebeziumDeserializationSchemaimport com.zbkj.util.SinkBuilder.getKafkaSinkimport com.zbkj.util._import org.apache.flink.api.common.eventtime.WatermarkStrategyimport org.apache.flink.api.common.restartstrategy.RestartStrategiesimport org.apache.flink.api.java.utils.ParameterToolimport org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindowsimport org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanupimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.scala._import java.util.Propertiesobject FlinkSingleSync { PropertiesManager.initUtil() val props: PropertiesUtil = PropertiesManager.getUtil def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 并行度 env.setParallelism(props.parallelism) env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val parameters: ParameterTool = ParameterTool.fromArgs(args) val memberID = parameters.getInt("memberID", 0) val source = parameters.get("source", "") val log = parameters.getBoolean("log", true) if (memberID == 0) { sys.exit(0) } val thisMember = "ttk_member_%d".format(memberID) val jobName = "Sync Member %d".format(memberID) val syncTopic = "sync_data_%d".format(memberID) println(syncTopic) val sourceFormat = SourceFormat.sourceFormat(source) env.setParallelism(4) // 启用检查点,指定触发checkpoint的时间间隔(单位:毫秒,默认500毫秒),默认情况是不开启的 env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE) // 设定Checkpoint超时时间,默认为10分钟 env.getCheckpointConfig.setCheckpointTimeout(600000) env.getCheckpointConfig.setCheckpointStorage("file:///data/flink-checkpoints/sync/%d".format(memberID)) env.getCheckpointConfig.setMinPauseBetweenCheckpoints(60000) // 默认情况下,只有一个检查点可以运行 // 根据用户指定的数量可以同时触发多个Checkpoint,进而提升Checkpoint整体的效率 //env.getCheckpointConfig.setMaxConcurrentCheckpoints(2) env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) // env.getCheckpointConfig.setPreferCheckpointForRecovery(true) // 设置可以允许的checkpoint失败数 env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3) //设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败 env.getCheckpointConfig.setTolerableCheckpointFailureNumber(2) env.disableOperatorChaining() env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 30000L)) val dataBaseList = thisMember var tableList = thisMember + ".*" if (!log) { tableList = "lb_crm_customer_log|.*(? JSON.parseObject(line)).setParallelism(4) val DDLSqlStream: DataStream[JSONObject] = streamSource.filter(line => !line.containsKey("op")).uid("ddlSqlStream") val DMLStream: DataStream[JSONObject] = streamSource.filter(line => line.containsKey("op")).uid("dmlStream") val DMLDataStream = FlinkCDCSyncETL.binLogETLOne(DMLStream) val keyByDMLDataStream:DataStream[(String, String, String, JSONArray)] = DMLDataStream.keyBy(keys => (keys._1, keys._2, keys._3)) .timeWindow(Time.milliseconds(props.window_time_milliseconds)) .reduce((itemFirst, itemSecond) => (itemFirst._1, itemFirst._2, itemFirst._3,combineJsonArray(itemFirst._4,itemSecond._4))) .map(line=>(line._1,line._2,line._3.split("-")(0),line._4)) .name("分组聚合").uid("keyBy") keyByDMLDataStream.addSink(new SinkDoris(dorisStreamLoad)).name("数据写入Doris").uid("SinkDoris").setParallelism(4) val DDLKafkaSink=getKafkaSink("schema_change") DDLSqlStream.map(jsObj => jsObj.toJSONString()).sinkTo(DDLKafkaSink).name("同步DDL入Kafka").uid("SinkDDLKafka") val kafkaSink=getKafkaSink(syncTopic) keyByDMLDataStream.map(line=>(line._2,line._3,1)).filter(!_._2.endsWith("_sql")) .keyBy(keys => (keys._1, keys._2)) .window(TumblingProcessingTimeWindows.of(Time.seconds(1))).sum(2) .map(line =>{ val json = new JSONObject() json.put("member_id", line._1) json.put("table", line._2) json.toJSONString() }).sinkTo(kafkaSink).name("同步数据库表入Kafka").uid("syncDataTableToKafka") env.execute(jobName) } def combineJsonArray(jsr1:JSONArray,jsr2:JSONArray): JSONArray ={ jsr1.addAll(jsr2) jsr1 }}
2.FlinkCDCSyncETL.scala
package com.zbkj.utilimport com.alibaba.fastjson2.{JSON, JSONArray, JSONObject}import org.apache.flink.api.scala.createTypeInformationimport org.apache.flink.streaming.api.scala.DataStreamimport java.util.Randomobject FlinkCDCSyncETL { def binLogETLOne(dataStreamSource: DataStream[JSONObject]): DataStream[(String, String, String, JSONArray)] = { val tupleData: DataStream[(String, String, String, JSONArray)] = dataStreamSource.map(line => { var data: JSONObject = new JSONObject() var jsr: JSONArray = new JSONArray() var mergeType = "APPEND" val source = line.getJSONObject("source") val db = source.getString("db") val table = source.getString("table") val op=line.getString("op") if ("d" == op) { data = line.getJSONObject("before") mergeType = "DELETE" } else if ("u" == op) { data = line.getJSONObject("after") mergeType = "APPEND" } else if ("c" == op) { data = line.getJSONObject("after") } else if ("r" == op) { data = line.getJSONObject("after") mergeType = "APPEND" } jsr.add(data) Tuple4(mergeType, db, table+ "-" + new Random().nextInt(4), jsr) }) tupleData }}
3.DorisStreamLoad2.scala
package com.zbkj.utilimport org.apache.doris.flink.exception.StreamLoadExceptionimport org.apache.doris.flink.sink.HttpPutBuilderimport org.apache.http.client.methods.CloseableHttpResponseimport org.apache.http.entity.StringEntityimport org.apache.http.impl.client.{DefaultRedirectStrategy, HttpClientBuilder, HttpClients}import org.apache.http.util.EntityUtilsimport org.slf4j.{Logger, LoggerFactory}import java.util.{Properties, UUID}class DorisStreamLoad2(props: PropertiesUtil) extends Serializable { private val logger: Logger = LoggerFactory.getLogger(classOf[DorisStreamLoad2]) private lazy val httpClientBuilder: HttpClientBuilder = HttpClients.custom.setRedirectStrategy(new DefaultRedirectStrategy() { override protected def isRedirectable(method: String): Boolean = { // If the connection target is FE, you need to deal with 307 redirect。 true } }) def loadJson(jsonData: String, mergeType: String, db: String, table: String): Unit = try { val loadUrlPattern = "http://%s/api/%s/%s/_stream_load?" val entity = new StringEntity(jsonData, "UTF-8") val streamLoadProp = new Properties() streamLoadProp.setProperty("merge_type", mergeType) streamLoadProp.setProperty("format", "json") streamLoadProp.setProperty("column_separator", ",") streamLoadProp.setProperty("line_delimiter", ",") streamLoadProp.setProperty("strip_outer_array", "true") streamLoadProp.setProperty("exec_mem_limit", "6442450944") streamLoadProp.setProperty("strict_mode", "true") val httpClient = httpClientBuilder.build val loadUrlStr = String.format(loadUrlPattern, props.doris_load_host, db, table) try { val builder = new HttpPutBuilder() val label = UUID.randomUUID.toString builder.setUrl(loadUrlStr) .baseAuth(props.doris_user, props.doris_password) .addCommonHeader() .setLabel(label) .setEntity(entity) .addProperties(streamLoadProp) handlePreCommitResponse(httpClient.execute(builder.build())) } def handlePreCommitResponse(response: CloseableHttpResponse): Unit = { val statusCode: Int = response.getStatusLine.getStatusCode if (statusCode == 200 && response.getEntity != null) { val loadResult: String = EntityUtils.toString(response.getEntity) logger.info("load Result {}", loadResult) } else { throw new StreamLoadException("stream load error: " + response.getStatusLine.toString) } } }}
4.SinkDoris.scala
package com.zbkj.utilimport com.alibaba.fastjson2.JSONArrayimport org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.functions.sink.RichSinkFunctionclass SinkDoris(dorisStreamLoad:DorisStreamLoad2) extends RichSinkFunction[(String, String, String, JSONArray)] { override def open(parameters: Configuration): Unit = {} override def invoke(value:(String, String, String, JSONArray)): Unit = { dorisStreamLoad.loadJson(value._4.toString,value._1,value._2,value._3) } override def close(): Unit = {}}
来源地址:https://blog.csdn.net/xiaofei2017/article/details/131459614