文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Flink CDC2.4 整库实时同步MySql 到Doris

2023-09-05 07:12

关注

环境

        Flink 1.15.4 

实现原因

        目前有很多工具都支持无代码实现Mysql -> Doris 的实时同步

        如:SlectDB 已发布的功能包

                Dinky SeaTunnel TIS 等等

         不过好多要么不支持表结构变动,要不不支持多sink,我们的业务必须支持对表结构的实时级变动,因为会对表字段级别的修改,字段类型更改,字段名字更改删除添加等

        所以要支持整库同步且又要表结构的实时变动就要自己写

                

所需jar

        flink-doris-connector-1.15-1.4.0.jar  -- 实现一键万表同步

        flink-sql-connector-mysql-cdc-2.4.0.jar --包含所有相关依赖,无需在导入debezium、cdc等等

流程

        1、脚本创建库表

        2、同步表结构程序  

        3、Flink cdc 程序

对比第一版本:使用 Flink CDC 实现 MySQL 数据,表结构实时入 Apache Doris 效率有所提升

        首次同步时keyby 后开窗聚合导致数据倾斜

        聚合数据有字符串拼接改为JsonArray 避免聚合导致背压,字符串在数据量较大时拼接效率太低

Flink cdc 代码

        1、FlinkSingleSync.scala

        

package com.zbkj.syncimport com.alibaba.fastjson2.{JSON, JSONObject,JSONArray}import com.ververica.cdc.connectors.mysql.source.MySqlSourceimport com.ververica.cdc.connectors.mysql.table.StartupOptionsimport com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfigimport com.ververica.cdc.debezium.JsonDebeziumDeserializationSchemaimport com.zbkj.util.SinkBuilder.getKafkaSinkimport com.zbkj.util._import org.apache.flink.api.common.eventtime.WatermarkStrategyimport org.apache.flink.api.common.restartstrategy.RestartStrategiesimport org.apache.flink.api.java.utils.ParameterToolimport org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindowsimport org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanupimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.scala._import java.util.Propertiesobject FlinkSingleSync {  PropertiesManager.initUtil()  val props: PropertiesUtil = PropertiesManager.getUtil  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment    // 并行度    env.setParallelism(props.parallelism)    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)        val parameters: ParameterTool = ParameterTool.fromArgs(args)    val memberID = parameters.getInt("memberID", 0)    val source = parameters.get("source", "")    val log = parameters.getBoolean("log", true)    if (memberID == 0) {      sys.exit(0)    }    val thisMember = "ttk_member_%d".format(memberID)    val jobName = "Sync Member %d".format(memberID)    val syncTopic = "sync_data_%d".format(memberID)    println(syncTopic)    val sourceFormat = SourceFormat.sourceFormat(source)    env.setParallelism(4)        // 启用检查点,指定触发checkpoint的时间间隔(单位:毫秒,默认500毫秒),默认情况是不开启的    env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE)    // 设定Checkpoint超时时间,默认为10分钟    env.getCheckpointConfig.setCheckpointTimeout(600000)        env.getCheckpointConfig.setCheckpointStorage("file:///data/flink-checkpoints/sync/%d".format(memberID))        env.getCheckpointConfig.setMinPauseBetweenCheckpoints(60000)    // 默认情况下,只有一个检查点可以运行    // 根据用户指定的数量可以同时触发多个Checkpoint,进而提升Checkpoint整体的效率    //env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)        env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)    //    env.getCheckpointConfig.setPreferCheckpointForRecovery(true)    // 设置可以允许的checkpoint失败数    env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3)    //设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败    env.getCheckpointConfig.setTolerableCheckpointFailureNumber(2)    env.disableOperatorChaining()        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 30000L))    val dataBaseList = thisMember    var tableList = thisMember + ".*"    if (!log) {      tableList = "lb_crm_customer_log|.*(? JSON.parseObject(line)).setParallelism(4)    val DDLSqlStream: DataStream[JSONObject] = streamSource.filter(line => !line.containsKey("op")).uid("ddlSqlStream")    val DMLStream: DataStream[JSONObject] = streamSource.filter(line => line.containsKey("op")).uid("dmlStream")        val DMLDataStream = FlinkCDCSyncETL.binLogETLOne(DMLStream)    val keyByDMLDataStream:DataStream[(String, String, String, JSONArray)] = DMLDataStream.keyBy(keys => (keys._1, keys._2, keys._3))      .timeWindow(Time.milliseconds(props.window_time_milliseconds))      .reduce((itemFirst, itemSecond) => (itemFirst._1, itemFirst._2, itemFirst._3,combineJsonArray(itemFirst._4,itemSecond._4)))      .map(line=>(line._1,line._2,line._3.split("-")(0),line._4))      .name("分组聚合").uid("keyBy")    keyByDMLDataStream.addSink(new SinkDoris(dorisStreamLoad)).name("数据写入Doris").uid("SinkDoris").setParallelism(4)    val DDLKafkaSink=getKafkaSink("schema_change")    DDLSqlStream.map(jsObj => jsObj.toJSONString()).sinkTo(DDLKafkaSink).name("同步DDL入Kafka").uid("SinkDDLKafka")        val kafkaSink=getKafkaSink(syncTopic)    keyByDMLDataStream.map(line=>(line._2,line._3,1)).filter(!_._2.endsWith("_sql"))      .keyBy(keys => (keys._1, keys._2))      .window(TumblingProcessingTimeWindows.of(Time.seconds(1))).sum(2)      .map(line =>{        val json = new JSONObject()        json.put("member_id", line._1)        json.put("table", line._2)        json.toJSONString()      }).sinkTo(kafkaSink).name("同步数据库表入Kafka").uid("syncDataTableToKafka")    env.execute(jobName)      }    def combineJsonArray(jsr1:JSONArray,jsr2:JSONArray): JSONArray ={    jsr1.addAll(jsr2)    jsr1  }}

2.FlinkCDCSyncETL.scala

package com.zbkj.utilimport com.alibaba.fastjson2.{JSON, JSONArray, JSONObject}import org.apache.flink.api.scala.createTypeInformationimport org.apache.flink.streaming.api.scala.DataStreamimport java.util.Randomobject FlinkCDCSyncETL {  def binLogETLOne(dataStreamSource: DataStream[JSONObject]): DataStream[(String, String, String, JSONArray)] = {        val tupleData: DataStream[(String, String, String, JSONArray)] = dataStreamSource.map(line => {      var data: JSONObject = new JSONObject()      var jsr: JSONArray = new JSONArray()      var mergeType = "APPEND"      val source = line.getJSONObject("source")      val db = source.getString("db")      val table = source.getString("table")      val op=line.getString("op")      if ("d" == op) {        data = line.getJSONObject("before")        mergeType = "DELETE"      } else if ("u" == op) {        data = line.getJSONObject("after")        mergeType = "APPEND"      } else if ("c" == op) {        data = line.getJSONObject("after")      } else if ("r" == op) {        data = line.getJSONObject("after")        mergeType = "APPEND"      }      jsr.add(data)      Tuple4(mergeType, db, table+ "-" + new Random().nextInt(4), jsr)    })    tupleData  }}

3.DorisStreamLoad2.scala

package com.zbkj.utilimport org.apache.doris.flink.exception.StreamLoadExceptionimport org.apache.doris.flink.sink.HttpPutBuilderimport org.apache.http.client.methods.CloseableHttpResponseimport org.apache.http.entity.StringEntityimport org.apache.http.impl.client.{DefaultRedirectStrategy, HttpClientBuilder, HttpClients}import org.apache.http.util.EntityUtilsimport org.slf4j.{Logger, LoggerFactory}import java.util.{Properties, UUID}class DorisStreamLoad2(props: PropertiesUtil) extends Serializable {  private val logger: Logger = LoggerFactory.getLogger(classOf[DorisStreamLoad2])  private lazy val httpClientBuilder: HttpClientBuilder = HttpClients.custom.setRedirectStrategy(new DefaultRedirectStrategy() {    override protected def isRedirectable(method: String): Boolean = {      // If the connection target is FE, you need to deal with 307 redirect。      true    }  })  def loadJson(jsonData: String, mergeType: String, db: String, table: String): Unit = try {    val loadUrlPattern = "http://%s/api/%s/%s/_stream_load?"    val entity = new StringEntity(jsonData, "UTF-8")    val streamLoadProp = new Properties()    streamLoadProp.setProperty("merge_type", mergeType)    streamLoadProp.setProperty("format", "json")    streamLoadProp.setProperty("column_separator", ",")    streamLoadProp.setProperty("line_delimiter", ",")    streamLoadProp.setProperty("strip_outer_array", "true")    streamLoadProp.setProperty("exec_mem_limit", "6442450944")    streamLoadProp.setProperty("strict_mode", "true")    val httpClient = httpClientBuilder.build    val loadUrlStr = String.format(loadUrlPattern, props.doris_load_host, db, table)    try {      val builder = new HttpPutBuilder()      val label = UUID.randomUUID.toString      builder.setUrl(loadUrlStr)        .baseAuth(props.doris_user, props.doris_password)        .addCommonHeader()        .setLabel(label)        .setEntity(entity)        .addProperties(streamLoadProp)      handlePreCommitResponse(httpClient.execute(builder.build()))    }    def handlePreCommitResponse(response: CloseableHttpResponse): Unit = {      val statusCode: Int = response.getStatusLine.getStatusCode      if (statusCode == 200 && response.getEntity != null) {        val loadResult: String = EntityUtils.toString(response.getEntity)        logger.info("load Result {}", loadResult)      } else {        throw new StreamLoadException("stream load error: " + response.getStatusLine.toString)      }    }  }}

4.SinkDoris.scala

package com.zbkj.utilimport com.alibaba.fastjson2.JSONArrayimport org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.functions.sink.RichSinkFunctionclass SinkDoris(dorisStreamLoad:DorisStreamLoad2) extends RichSinkFunction[(String, String, String, JSONArray)]  {  override def open(parameters: Configuration): Unit = {}    override def invoke(value:(String, String, String, JSONArray)): Unit = {    dorisStreamLoad.loadJson(value._4.toString,value._1,value._2,value._3)  }  override def close(): Unit = {}}

来源地址:https://blog.csdn.net/xiaofei2017/article/details/131459614

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-数据库
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯