1.采集时间变成时间戳问题
1.1针对时间类型转换
使用flinkcdc V2.3.0(及其以下版本) 采集时间,源库中为yyyy-MM-dd 类型采集后变为时间戳
源库数据样式:
采集后的json数据
{"before":{"id":"4","claim_amount":0.0,"follow_result_type":"2","logistics_settle_flag":null,"submit_operater":"s","apply_operater":"s","submit_time":1665741204000,"apply_time":1665741210000,"claim_remark":"自动提报调度:异常售后自动提报为(实物追回)。","exception_type":"AG_RETURN_LOSS","apply_id":"df1bbf781000472b8988240a55f9fb73","apply_no":"1","order_sub_no":"1","out_order_id":"1","follow_status":"PROCESSED","follow_source":"oms","loss_reason":null,"responsibility_ascription":"None","trading_no":null,"dept_code":null,"update_time":1665742789000,"create_time":1665741337000,"total_ex_goods_num":0},"after":{"id":"4","claim_amount":0.0,"follow_result_type":"2","logistics_settle_flag":null,"submit_operater":"s","apply_operater":"s","submit_time":null,"apply_time":1665741210000,"claim_remark":"自动提报调度:异常售后自动提报为(实物追回)。","exception_type":"AG_RETURN_LOSS","apply_id":"df1bbf781000472b8988240a55f9fb73","apply_no":"1","order_sub_no":"1","out_order_id":"1","follow_status":"PROCESSED","follow_source":"oms","loss_reason":null,"responsibility_ascription":"None","trading_no":null,"dept_code":null,"update_time":1668183700000,"create_time":1665741337000,"total_ex_goods_num":0},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1668154900000,"snapshot":"false","db":"flink_cdc_timestamp","sequence":null,"table":"table_flink_cdc_test1_default_problem","server_id":1,"gtid":null,"file":"","pos":509,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1668154900186,"transaction":null}
1.2解决办法:
1.2.1编写处理时间类:
import io.debezium.spi.converter.CustomConverter;import io.debezium.spi.converter.RelationalColumn;import org.apache.kafka.connect.data.SchemaBuilder;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.time.*;import java.time.format.DateTimeFormatter;import java.util.Properties;import java.util.function.Consumer;public class MySqlDateTimeConverter implements CustomConverter { private final static Logger logger = LoggerFactory.getLogger(MySqlDateTimeConverter.class); private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE; private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME; private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME; private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME; private ZoneId timestampZoneId = ZoneId.systemDefault(); @Override public void configure(Properties props) { readProps(props, "format.date", p -> dateFormatter = DateTimeFormatter.ofPattern(p)); readProps(props, "format.time", p -> timeFormatter = DateTimeFormatter.ofPattern(p)); readProps(props, "format.datetime", p -> datetimeFormatter = DateTimeFormatter.ofPattern(p)); readProps(props, "format.timestamp", p -> timestampFormatter = DateTimeFormatter.ofPattern(p)); readProps(props, "format.timestamp.zone", z -> timestampZoneId = ZoneId.of(z)); } private void readProps(Properties properties, String settingKey, Consumer callback) { String settingValue = (String) properties.get(settingKey); if (settingValue == null || settingValue.length() == 0) { return; } try { callback.accept(settingValue.trim()); } catch (IllegalArgumentException | DateTimeException e) { logger.error("The {} setting is illegal: {}",settingKey,settingValue); throw e; } } @Override public void converterFor(RelationalColumn column, ConverterRegistration registration) { String sqlType = column.typeName().toUpperCase(); SchemaBuilder schemaBuilder = null; Converter converter = null; if ("DATE".equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string"); converter = this::convertDate; } if ("TIME".equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string"); converter = this::convertTime; } if ("DATETIME".equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string"); converter = this::convertDateTime; } if ("TIMESTAMP".equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string"); converter = this::convertTimestamp; } if (schemaBuilder != null) { registration.register(schemaBuilder, converter); } } private String convertDate(Object input) { if (input instanceof LocalDate) { return dateFormatter.format((LocalDate) input); } if (input instanceof Integer) { LocalDate date = LocalDate.ofEpochDay((Integer) input); return dateFormatter.format(date); } return null; } private String convertTime(Object input) { if (input instanceof Duration) { Duration duration = (Duration) input; long seconds = duration.getSeconds(); int nano = duration.getNano(); LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano); return timeFormatter.format(time); } return null; } private String convertDateTime(Object input) { if (input instanceof LocalDateTime) { return datetimeFormatter.format((LocalDateTime) input); } return null; } private String convertTimestamp(Object input) { if (input instanceof ZonedDateTime) { // mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间 ZonedDateTime zonedDateTime = (ZonedDateTime) input; LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime(); return timestampFormatter.format(localDateTime); } return null; }}
2在主方法中引入:
package com.belle.dc.stram;import com.ververica.cdc.connectors.mysql.table.StartupOptions;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;import com.ververica.cdc.connectors.mysql.source.MySqlSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Properties;// 测试flink cdc2.3.0 对多库多表和指定时间戳的支持问题public class FlinkCdcApp { public static String HOST = "localhost"; public static int PORT = 3306 ; public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); MySqlSource mySqlSource = MySqlSource.builder() .hostname(HOST) .port(PORT) .databaseList("flink_cdc_timestamp","magic_api") // set captured database, If you need to synchronize the whole database, Please set tableList to ".*". .tableList("flink_cdc_timestamp.table_flink_cdc_test1_default_problem","magic_api.magic_api_file_v2") // set captured table .username("root") .password("root") .startupOptions(StartupOptions.timestamp(1667232000000l)) .debeziumProperties(getDebeziumProperties()) .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .build(); env .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source") .setParallelism(4) .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering env.execute(); } private static Properties getDebeziumProperties(){ Properties properties = new Properties(); properties.setProperty("converters", "dateConverters"); //根据类在那个包下面修改 properties.setProperty("dateConverters.type", "com.belle.dc.stram.MySqlDateTimeConverter"); properties.setProperty("dateConverters.format.date", "yyyy-MM-dd"); properties.setProperty("dateConverters.format.time", "HH:mm:ss"); properties.setProperty("dateConverters.format.datetime", "yyyy-MM-dd HH:mm:ss"); properties.setProperty("dateConverters.format.timestamp", "yyyy-MM-dd HH:mm:ss"); properties.setProperty("dateConverters.format.timestamp.zone", "UTC+8"); properties.setProperty("debezium.snapshot.locking.mode","none"); //全局读写锁,可能会影响在线业务,跳过锁设置 properties.setProperty("include.schema.changes", "true"); properties.setProperty("bigint.unsigned.handling.mode","long"); properties.setProperty("decimal.handling.mode","double"); return properties; }}
1.3加入后采集的结果
{"before":null,"after":{"file_path":"magic-api/datasource/","file_content":"this is directory"},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1667445884000,"snapshot":"false","db":"magic_api","sequence":null,"table":"magic_api_file_v2","server_id":1,"gtid":null,"file":"","pos":1645,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1668157791531,"transaction":null}{"before":null,"after":{"file_path":"magic-api/function/","file_content":"this is directory"},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1667445884000,"snapshot":"false","db":"magic_api","sequence":null,"table":"magic_api_file_v2","server_id":1,"gtid":null,"file":"","pos":1988,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1668157791532,"transaction":null}{"before":null,"after":{"file_path":"magic-api/api/","file_content":"this is directory"},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1667445884000,"snapshot":"false","db":"magic_api","sequence":null,"table":"magic_api_file_v2","server_id":1,"gtid":null,"file":"","pos":2329,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1668157791533,"transaction":null}{"before":null,"after":{"file_path":"magic-api/api/magic_api_test/","file_content":"this is directory"},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1667463052000,"snapshot":"false","db":"magic_api","sequence":null,"table":"magic_api_file_v2","server_id":1,"gtid":null,"file":"","pos":2665,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1668157791533,"transaction":null}{"before":null,"after":{"file_path":"magic-api/api/magic_api_test/group.json","file_content":"{\r\n \"properties\" : { },\r\n \"id\" : \"57c3acdd91c34cd791eff5c187906231\",\r\n \"name\" : \"magic_api_test\",\r\n \"type\" : \"api\",\r\n \"parentId\" : \"0\",\r\n \"path\" : \"magic_api_test\",\r\n \"createTime\" : 1667463052248,\r\n \"updateTime\" : null,\r\n \"createBy\" : null,\r\n \"updateBy\" : null,\r\n \"paths\" : [ ],\r\n \"options\" : [ ]\r\n}"},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1667463052000,"snapshot":"false","db":"magic_api","sequence":null,"table":"magic_api_file_v2","server_id":1,"gtid":null,"file":"","pos":3016,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1668157791534,"transaction":null}{"before":null,"after":{"file_path":"magic-api/api/magic_api_test/magic_api测试.ms","file_content":"{\r\n \"properties\" : { },\r\n \"id\" : \"2849b081c7bb4b0eb752c37020b77099\",\r\n \"script\" : null,\r\n \"groupId\" : \"57c3acdd91c34cd791eff5c187906231\",\r\n \"name\" : \"magic_api测试\",\r\n \"createTime\" : 1667463229393,\r\n \"updateTime\" : null,\r\n \"lock\" : null,\r\n \"createBy\" : null,\r\n \"updateBy\" : null,\r\n \"path\" : \"/test\",\r\n \"method\" : \"GET\",\r\n \"parameters\" : [ ],\r\n \"options\" : [ ],\r\n \"requestBody\" : null,\r\n \"headers\" : [ ],\r\n \"paths\" : [ ],\r\n \"responseBody\" : null,\r\n \"description\" : null,\r\n \"requestBodyDefinition\" : null,\r\n \"responseBodyDefinition\" : null\r\n}\r\n================================\r\nreturn 'Hello magic-api'"},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1667463229000,"snapshot":"false","db":"magic_api","sequence":null,"table":"magic_api_file_v2","server_id":1,"gtid":null,"file":"","pos":4374,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1668157791535,"transaction":null}{"before":{"file_path":"magic-api/api/magic_api_test/magic_api测试.ms","file_content":"{\r\n \"properties\" : { },\r\n \"id\" : \"2849b081c7bb4b0eb752c37020b77099\",\r\n \"script\" : null,\r\n \"groupId\" : \"57c3acdd91c34cd791eff5c187906231\",\r\n \"name\" : \"magic_api测试\",\r\n \"createTime\" : 1667463229393,\r\n \"updateTime\" : null,\r\n \"lock\" : null,\r\n \"createBy\" : null,\r\n \"updateBy\" : null,\r\n \"path\" : \"/test\",\r\n \"method\" : \"GET\",\r\n \"parameters\" : [ ],\r\n \"options\" : [ ],\r\n \"requestBody\" : null,\r\n \"headers\" : [ ],\r\n \"paths\" : [ ],\r\n \"responseBody\" : null,\r\n \"description\" : null,\r\n \"requestBodyDefinition\" : null,\r\n \"responseBodyDefinition\" : null\r\n}\r\n================================\r\nreturn 'Hello magic-api'"},"after":{"file_path":"magic-api/api/magic_api_test/magic_api测试.ms","file_content":"{\r\n \"properties\" : { },\r\n \"id\" : \"2849b081c7bb4b0eb752c37020b77099\",\r\n \"script\" : null,\r\n \"groupId\" : \"57c3acdd91c34cd791eff5c187906231\",\r\n \"name\" : \"magic_api测试\",\r\n \"createTime\" : null,\r\n \"updateTime\" : 1667463464144,\r\n \"lock\" : null,\r\n \"createBy\" : null,\r\n \"updateBy\" : null,\r\n \"path\" : \"/test\",\r\n \"method\" : \"GET\",\r\n \"parameters\" : [ ],\r\n \"options\" : [ ],\r\n \"requestBody\" : null,\r\n \"headers\" : [ ],\r\n \"paths\" : [ ],\r\n \"responseBody\" : \"{\\n \\\"code\\\": 1,\\n \\\"message\\\": \\\"success\\\",\\n \\\"data\\\": \\\"Hello magic-api\\\",\\n \\\"timestamp\\\": 1667463381304,\\n \\\"executeTime\\\": 2\\n}\",\r\n \"description\" : null,\r\n \"requestBodyDefinition\" : null,\r\n \"responseBodyDefinition\" : {\r\n \"name\" : \"\",\r\n \"value\" : \"\",\r\n \"description\" : \"\",\r\n \"required\" : false,\r\n \"dataType\" : \"Object\",\r\n \"type\" : null,\r\n \"defaultValue\" : null,\r\n \"validateType\" : \"\",\r\n \"error\" : \"\",\r\n \"expression\" : \"\",\r\n \"children\" : [ {\r\n \"name\" : \"code\",\r\n \"value\" : \"1\",\r\n \"description\" : \"\",\r\n \"required\" : false,\r\n \"dataType\" : \"Integer\",\r\n \"type\" : null,\r\n \"defaultValue\" : null,\r\n \"validateType\" : \"\",\r\n \"error\" : \"\",\r\n \"expression\" : \"\",\r\n \"children\" : [ ]\r\n }, {\r\n \"name\" : \"message\",\r\n \"value\" : \"success\",\r\n \"description\" : \"\",\r\n \"required\" : false,\r\n \"dataType\" : \"String\",\r\n \"type\" : null,\r\n \"defaultValue\" : null,\r\n \"validateType\" : \"\",\r\n \"error\" : \"\",\r\n \"expression\" : \"\",\r\n \"children\" : [ ]\r\n }, {\r\n \"name\" : \"data\",\r\n \"value\" : \"Hello magic-api\",\r\n \"description\" : \"\",\r\n \"required\" : false,\r\n \"dataType\" : \"String\",\r\n \"type\" : null,\r\n \"defaultValue\" : null,\r\n \"validateType\" : \"\",\r\n \"error\" : \"\",\r\n \"expression\" : \"\",\r\n \"children\" : [ ]\r\n }, {\r\n \"name\" : \"timestamp\",\r\n \"value\" : \"1667463381304\",\r\n \"description\" : \"\",\r\n \"required\" : false,\r\n \"dataType\" : \"Long\",\r\n \"type\" : null,\r\n \"defaultValue\" : null,\r\n \"validateType\" : \"\",\r\n \"error\" : \"\",\r\n \"expression\" : \"\",\r\n \"children\" : [ ]\r\n }, {\r\n \"name\" : \"executeTime\",\r\n \"value\" : \"2\",\r\n \"description\" : \"\",\r\n \"required\" : false,\r\n \"dataType\" : \"Integer\",\r\n \"type\" : null,\r\n \"defaultValue\" : null,\r\n \"validateType\" : \"\",\r\n \"error\" : \"\",\r\n \"expression\" : \"\",\r\n \"children\" : [ ]\r\n } ]\r\n }\r\n}\r\n================================\r\nvar sql = \"\"\"\r\n select * from magic_backup_record_v2\r\n\"\"\"\r\n\r\nreturn db.select(sql)"},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1667463464000,"snapshot":"false","db":"magic_api","sequence":null,"table":"magic_api_file_v2","server_id":1,"gtid":null,"file":"","pos":6330,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1668157791537,"transaction":null}{"before":null,"after":{"id":"1","claim_amount":0.0,"follow_result_type":"2","logistics_settle_flag":null,"submit_operater":"s","apply_operater":"s","submit_time":"2022-10-14 09:53:24","apply_time":"2022-10-14 09:53:30","claim_remark":"自动提报调度:异常售后自动提报为(实物追回)。","exception_type":"AG_RETURN_LOSS","apply_id":"df1bbf781000472b8988240a55f9fb73","apply_no":"1","order_sub_no":"1","out_order_id":"1","follow_status":"PROCESSED","follow_source":"oms","loss_reason":null,"responsibility_ascription":null,"trading_no":null,"dept_code":null,"update_time":"2022-10-14 09:55:41","create_time":"2022-10-14 09:55:37","total_ex_goods_num":0},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1667546262000,"snapshot":"false","db":"flink_cdc_timestamp","sequence":null,"table":"table_flink_cdc_test1_default_problem","server_id":1,"gtid":null,"file":"","pos":16892,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1668157791551,"transaction":null}{"before":null,"after":{"id":"2","claim_amount":0.0,"follow_result_type":"2","logistics_settle_flag":null,"submit_operater":"s","apply_operater":"s","submit_time":"2022-10-14 09:53:24","apply_time":"2022-10-14 09:53:30","claim_remark":"自动提报调度:异常售后自动提报为(实物追回)。","exception_type":"AG_RETURN_LOSS","apply_id":"df1bbf781000472b8988240a55f9fb73","apply_no":"1","order_sub_no":"1","out_order_id":"1","follow_status":"PROCESSED","follow_source":"oms","loss_reason":null,"responsibility_ascription":"None","trading_no":null,"dept_code":null,"update_time":"2022-10-14 10:19:49","create_time":"2022-10-14 09:55:37","total_ex_goods_num":0},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1667546262000,"snapshot":"false","db":"flink_cdc_timestamp","sequence":null,"table":"table_flink_cdc_test1_default_problem","server_id":1,"gtid":null,"file":"","pos":17269,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1668157791551,"transaction":null}{"before":null,"after":{"id":"4","claim_amount":0.0,"follow_result_type":"2","logistics_settle_flag":null,"submit_operater":"s","apply_operater":"s","submit_time":"2022-10-14 09:53:24","apply_time":"2022-10-14 09:53:30","claim_remark":"自动提报调度:异常售后自动提报为(实物追回)。","exception_type":"AG_RETURN_LOSS","apply_id":"df1bbf781000472b8988240a55f9fb73","apply_no":"1","order_sub_no":"1","out_order_id":"1","follow_status":"PROCESSED","follow_source":"oms","loss_reason":null,"responsibility_ascription":"None","trading_no":null,"dept_code":null,"update_time":"2022-10-14 10:19:49","create_time":"2022-10-14 09:55:37","total_ex_goods_num":0},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1667546262000,"snapshot":"false","db":"flink_cdc_timestamp","sequence":null,"table":"table_flink_cdc_test1_default_problem","server_id":1,"gtid":null,"file":"","pos":17643,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1668157791552,"transaction":null}{"before":{"id":"4","claim_amount":0.0,"follow_result_type":"2","logistics_settle_flag":null,"submit_operater":"s","apply_operater":"s","submit_time":"2022-10-14 09:53:24","apply_time":"2022-10-14 09:53:30","claim_remark":"自动提报调度:异常售后自动提报为(实物追回)。","exception_type":"AG_RETURN_LOSS","apply_id":"df1bbf781000472b8988240a55f9fb73","apply_no":"1","order_sub_no":"1","out_order_id":"1","follow_status":"PROCESSED","follow_source":"oms","loss_reason":null,"responsibility_ascription":"None","trading_no":null,"dept_code":null,"update_time":"2022-10-14 10:19:49","create_time":"2022-10-14 09:55:37","total_ex_goods_num":0},"after":{"id":"4","claim_amount":0.0,"follow_result_type":"2","logistics_settle_flag":null,"submit_operater":"s","apply_operater":"s","submit_time":null,"apply_time":"2022-10-14 09:53:30","claim_remark":"自动提报调度:异常售后自动提报为(实物追回)。","exception_type":"AG_RETURN_LOSS","apply_id":"df1bbf781000472b8988240a55f9fb73","apply_no":"1","order_sub_no":"1","out_order_id":"1","follow_status":"PROCESSED","follow_source":"oms","loss_reason":null,"responsibility_ascription":"None","trading_no":null,"dept_code":null,"update_time":"2022-11-11 16:21:40","create_time":"2022-10-14 09:55:37","total_ex_goods_num":0},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1668154900000,"snapshot":"false","db":"flink_cdc_timestamp","sequence":null,"table":"table_flink_cdc_test1_default_problem","server_id":1,"gtid":null,"file":"","pos":509,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1668157791573,"transaction":null}
2.如有问题可以联系
email: z_haoh@163.com
github: GitHub - zhu-h/self_flink_learn
来源地址:https://blog.csdn.net/qq_30529079/article/details/127809317