前言
flinkcdc单表同步比较简单,按照官方案例基本都能成功,多表异构同步、整库同步这块一直想尝试一下,社区说使用API可以做到,但是一直没能白嫖到可行方案(代码),然后自己动手尝试了下,咳咳,无奈技术太菜,java各种语法都搞的不是太明白,时间跨度蛮久,中间遇到了不少问题,中途偶然间在群里看到了很久很久以前群友发的一份同步方案,可惜缺少了反序列化的过程,借鉴过来改巴改巴(也改了好几个星期,太菜了),勉强是能跑了,分享出来,能帮到大家一点也就很好了。
方案思路
这个方案的整体思路我先说一下(大佬的思路,我借鉴的),首先我们先使用mysqlcatalog获取到各个表的信息(列名、列类型之类的),然后创建相应的sink table,然后flinkcdc的DataStream是提供了整库获取数据的能力的,所以我们就采用DataStream的方式拿到数据,然后在自定义反序列化里形成
走起:
flink版本:1.15.2(1.15以下版本貌似还没有mysqlcatalog,如果要使用低版本,代码需要调整一下)
flink cdc版本:2.3.0
注意:需先在sink库创建好相应的表(之前忘记写了)
不巴拉了,直接上代码,场景是mysql -> mysql,sink端如果是其他数据库理论上应该是一样,source表需要有主键,这是flinkcdc底层约定好的,没有会报错。
package com.cityos;import com.ververica.cdc.connectors.mysql.source.MySqlSource;import com.ververica.cdc.connectors.mysql.table.StartupOptions;import org.apache.commons.lang3.StringUtils;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.typeutils.RowTypeInfo;import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;import org.apache.flink.connector.jdbc.catalog.MySqlCatalog;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Schema;import org.apache.flink.table.api.StatementSet;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.catalog.DefaultCatalogTable;import org.apache.flink.table.catalog.ObjectPath;import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;import org.apache.flink.table.types.DataType;import org.apache.flink.table.types.logical.LogicalType;import org.apache.flink.table.types.logical.RowType;import org.apache.flink.types.Row;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.ArrayList;import java.util.List;import java.util.Map;public class FlinkCdcMultiSyncJdbc { private static final Logger log = LoggerFactory.getLogger(FlinkCdcMultiSyncJdbc.class); public static void main(String[] args) throws Exception { // source端连接信息 String userName = "root"; String passWord = "18772247265Ldy@"; String host = "localhost"; String db = "flinktest1"; // 如果是整库,tableList = ".*" String tableList = "lidy.nlp_category,lidy.nlp_classify_man_made3"; int port = 33306; // sink连接信息模板 String sink_url = "jdbc:mysql://localhost:33306/flinktest?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"; String sink_username = "root"; String sink_password = "18772247265Ldy@"; String connectorWithBody = " with (\n" + " 'connector' = 'jdbc',\n" + " 'url' = '${sink_url}',\n" + " 'username' = '${sink_username}',\n" + " 'password' = '${sink_password}',\n" + " 'table-name' = '${tableName}'\n" + ")"; connectorWithBody = connectorWithBody.replace("${sink_url}", sink_url) .replace("${sink_username}", sink_username) .replace("${sink_password}", sink_password); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(3000); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // 注册同步的库对应的catalog MySqlCatalog mysqlCatalog = new MySqlCatalog("mysql-catalog", db, userName, passWord, String.format("jdbc:mysql://%s:%d", host, port)); List<String> tables = new ArrayList<>(); // 如果整库同步,则从catalog里取所有表,否则从指定表中取表名 if (".*".equals(tableList)) { tables = mysqlCatalog.listTables(db); } else { String[] tableArray = tableList.split(","); for (String table : tableArray) { tables.add(table.split("\\.")[1]); } } // 创建表名和对应RowTypeInfo映射的Map Map<String, RowTypeInfo> tableTypeInformationMap = Maps.newConcurrentMap(); Map<String, DataType[]> tableDataTypesMap = Maps.newConcurrentMap(); Map<String, RowType> tableRowTypeMap = Maps.newConcurrentMap(); for (String table : tables) { // 获取mysql catalog中注册的表 ObjectPath objectPath = new ObjectPath(db, table); DefaultCatalogTable catalogBaseTable = (DefaultCatalogTable) mysqlCatalog.getTable(objectPath); // 获取表的Schema Schema schema = catalogBaseTable.getUnresolvedSchema(); // 获取表中字段名列表 String[] fieldNames = new String[schema.getColumns().size()]; // 获取DataType DataType[] fieldDataTypes = new DataType[schema.getColumns().size()]; LogicalType[] logicalTypes = new LogicalType[schema.getColumns().size()]; // 获取表字段类型 TypeInformation<?>[] fieldTypes = new TypeInformation[schema.getColumns().size()]; // 获取表的主键 List<String> primaryKeys = schema.getPrimaryKey().get().getColumnNames(); for (int i = 0; i < schema.getColumns().size(); i++) { Schema.UnresolvedPhysicalColumn column = (Schema.UnresolvedPhysicalColumn) schema.getColumns().get(i); fieldNames[i] = column.getName(); fieldDataTypes[i] = (DataType) column.getDataType(); fieldTypes[i] = InternalTypeInfo.of(((DataType) column.getDataType()).getLogicalType()); logicalTypes[i] = ((DataType) column.getDataType()).getLogicalType(); } RowType rowType = RowType.of(logicalTypes, fieldNames); tableRowTypeMap.put(table, rowType); // 组装sink表ddl sql StringBuilder stmt = new StringBuilder(); String tableName = table; String jdbcSinkTableName = String.format("jdbc_sink_%s", tableName); stmt.append("create table ").append(jdbcSinkTableName).append("(\n"); for (int i = 0; i < fieldNames.length; i++) { String column = fieldNames[i]; String fieldDataType = fieldDataTypes[i].toString(); stmt.append("\t").append(column).append(" ").append(fieldDataType).append(",\n"); } stmt.append(String.format("PRIMARY KEY (%s) NOT ENFORCED\n)", StringUtils.join(primaryKeys, ","))); String formatJdbcSinkWithBody = connectorWithBody .replace("${tableName}", jdbcSinkTableName); String createSinkTableDdl = stmt.toString() + formatJdbcSinkWithBody; // 创建sink表 log.info("createSinkTableDdl: {}", createSinkTableDdl); tEnv.executeSql(createSinkTableDdl); tableDataTypesMap.put(tableName, fieldDataTypes); tableTypeInformationMap.put(tableName, new RowTypeInfo(fieldTypes, fieldNames)); } // 监控mysql binlog MySqlSource mySqlSource = MySqlSource.<Tuple2<String, Row>>builder() .hostname(host) .port(port) .databaseList(db) .tableList(tableList) .username(userName) .password(passWord) .deserializer(new CustomDebeziumDeserializer(tableRowTypeMap)) .startupOptions(StartupOptions.initial()) .build(); SingleOutputStreamOperator<Tuple2<String, Row>> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql cdc").disableChaining(); StatementSet statementSet = tEnv.createStatementSet(); // dataStream转Table,创建临时视图,插入sink表 for (Map.Entry<String, RowTypeInfo> entry : tableTypeInformationMap.entrySet()) { String tableName = entry.getKey(); RowTypeInfo rowTypeInfo = entry.getValue(); SingleOutputStreamOperator<Row> mapStream = dataStreamSource.filter(data -> data.f0.equals(tableName)).map(data -> data.f1, rowTypeInfo); Table table = tEnv.fromChangelogStream(mapStream); String temporaryViewName = String.format("t_%s", tableName); tEnv.createTemporaryView(temporaryViewName, table); String sinkTableName = String.format("jdbc_sink_%s", tableName); String insertSql = String.format("insert into %s select * from %s", sinkTableName, temporaryViewName); log.info("add insertSql for {},sql: {}", tableName, insertSql); statementSet.addInsertSql(insertSql); } statementSet.execute(); }}
对应的反序列化代码
package com.cityos;import com.ververica.cdc.debezium.DebeziumDeserializationSchema;import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter;import com.ververica.cdc.debezium.utils.TemporalConversions;import io.debezium.data.Envelope;import io.debezium.data.SpecialValueDecimal;import io.debezium.data.VariableScaleDecimal;import io.debezium.time.*;import org.apache.flink.api.common.typeinfo.TypeHint;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;import org.apache.flink.table.data.DecimalData;import org.apache.flink.table.data.RowData;import org.apache.flink.table.data.StringData;import org.apache.flink.table.data.TimestampData;import org.apache.flink.table.types.logical.DecimalType;import org.apache.flink.table.types.logical.LogicalType;import org.apache.flink.table.types.logical.RowType;import org.apache.flink.types.Row;import org.apache.flink.types.RowKind;import org.apache.flink.util.Collector;import org.apache.kafka.connect.data.Decimal;import org.apache.kafka.connect.data.Field;import org.apache.kafka.connect.data.Schema;import org.apache.kafka.connect.data.Struct;import org.apache.kafka.connect.source.SourceRecord;import java.math.BigDecimal;import java.nio.ByteBuffer;import java.time.Instant;import java.time.LocalDateTime;import java.time.ZoneId;import java.util.Map;public class CustomDebeziumDeserializer implements DebeziumDeserializationSchema { private final Map<String, RowType> tableRowTypeMap; private Map<String, DeserializationRuntimeConverter> physicalConverterMap = Maps.newConcurrentMap(); CustomDebeziumDeserializer(Map tableRowTypeMap) { this.tableRowTypeMap = tableRowTypeMap; for (String tablename : this.tableRowTypeMap.keySet()) { RowType rowType = this.tableRowTypeMap.get(tablename); DeserializationRuntimeConverter physicalConverter =createNotNullConverter(rowType); this.physicalConverterMap.put(tablename,physicalConverter); } } @Override public void deserialize(SourceRecord record, Collector out) throws Exception { Envelope.Operation op = Envelope.operationFor(record); Struct value = (Struct) record.value(); Schema valueSchema = record.valueSchema(); Struct source = value.getStruct("source"); String tablename = source.get("table").toString(); DeserializationRuntimeConverter physicalConverter = physicalConverterMap.get(tablename); if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) { Row insert = extractAfterRow(value, valueSchema, physicalConverter); insert.setKind(RowKind.INSERT); out.collect(Tuple2.of(tablename,insert)); } else if (op == Envelope.Operation.DELETE) { Row delete = extractBeforeRow(value, valueSchema, physicalConverter); delete.setKind(RowKind.DELETE); out.collect(Tuple2.of(tablename,delete)); } else { Row before = extractBeforeRow(value, valueSchema, physicalConverter); before.setKind(RowKind.UPDATE_BEFORE); out.collect(Tuple2.of(tablename,before)); Row after = extractAfterRow(value, valueSchema, physicalConverter); after.setKind(RowKind.UPDATE_AFTER); out.collect(Tuple2.of(tablename,after)); } } private Row extractAfterRow(Struct value, Schema valueSchema, DeserializationRuntimeConverter physicalConverter) throws Exception { Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema(); Struct after = value.getStruct(Envelope.FieldName.AFTER); return (Row) physicalConverter.convert(after, afterSchema); } private Row extractBeforeRow(Struct value, Schema valueSchema, DeserializationRuntimeConverter physicalConverter) throws Exception { Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema(); Struct before = value.getStruct(Envelope.FieldName.BEFORE); return (Row) physicalConverter.convert(before, beforeSchema); } @Override public TypeInformation<Tuple2<String, Row>> getProducedType() { return TypeInformation.of(new TypeHint<Tuple2<String, Row>>() { }); } public static DeserializationRuntimeConverter createNotNullConverter(LogicalType type) { switch (type.getTypeRoot()) { case NULL: return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { return null; } }; case BOOLEAN: return convertToBoolean(); case TINYINT: return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { return Byte.parseByte(dbzObj.toString()); } }; case SMALLINT: return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { return Short.parseShort(dbzObj.toString()); } }; case INTEGER: case INTERVAL_YEAR_MONTH: return convertToInt(); case BIGINT: case INTERVAL_DAY_TIME: return convertToLong(); case DATE: return convertToDate(); case TIME_WITHOUT_TIME_ZONE: return convertToTime(); case TIMESTAMP_WITHOUT_TIME_ZONE: return convertToTimestamp(ZoneId.of("UTC")); case TIMESTAMP_WITH_LOCAL_TIME_ZONE: return convertToLocalTimeZoneTimestamp(ZoneId.of("UTC")); case FLOAT: return convertToFloat(); case DOUBLE: return convertToDouble(); case CHAR: case VARCHAR: return convertToString(); case BINARY: case VARBINARY: return convertToBinary(); case DECIMAL: return createDecimalConverter((DecimalType) type); case ROW: return createRowConverter( (RowType) type); case ARRAY: case MAP: case MULTISET: case RAW: default: throw new UnsupportedOperationException("Unsupported type: " + type); } } private static DeserializationRuntimeConverter convertToBoolean() { return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { if (dbzObj instanceof Boolean) { return dbzObj; } else if (dbzObj instanceof Byte) { return (byte) dbzObj == 1; } else if (dbzObj instanceof Short) { return (short) dbzObj == 1; } else { return Boolean.parseBoolean(dbzObj.toString()); } } }; } private static DeserializationRuntimeConverter convertToInt() { return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { if (dbzObj instanceof Integer) { return dbzObj; } else if (dbzObj instanceof Long) { return ((Long) dbzObj).intValue(); } else { return Integer.parseInt(dbzObj.toString()); } } }; } private static DeserializationRuntimeConverter convertToLong() { return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { if (dbzObj instanceof Integer) { return ((Integer) dbzObj).longValue(); } else if (dbzObj instanceof Long) { return dbzObj; } else { return Long.parseLong(dbzObj.toString()); } } }; } private static DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) { final int precision = decimalType.getPrecision(); final int scale = decimalType.getScale(); return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { BigDecimal bigDecimal; if (dbzObj instanceof byte[]) { // decimal.handling.mode=precise bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj); } else if (dbzObj instanceof String) { // decimal.handling.mode=string bigDecimal = new BigDecimal((String) dbzObj); } else if (dbzObj instanceof Double) { // decimal.handling.mode=double bigDecimal = BigDecimal.valueOf((Double) dbzObj); } else { if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) { SpecialValueDecimal decimal = VariableScaleDecimal.toLogical((Struct) dbzObj); bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO); } else { // fallback to string bigDecimal = new BigDecimal(dbzObj.toString()); } } return DecimalData.fromBigDecimal(bigDecimal, precision, scale); } }; } private static DeserializationRuntimeConverter convertToDouble() { return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { if (dbzObj instanceof Float) { return ((Float) dbzObj).doubleValue(); } else if (dbzObj instanceof Double) { return dbzObj; } else { return Double.parseDouble(dbzObj.toString()); } } }; } private static DeserializationRuntimeConverter convertToFloat() { return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { if (dbzObj instanceof Float) { return dbzObj; } else if (dbzObj instanceof Double) { return ((Double) dbzObj).floatValue(); } else { return Float.parseFloat(dbzObj.toString()); } } }; } private static DeserializationRuntimeConverter convertToDate() { return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { return (int) TemporalConversions.toLocalDate(dbzObj).toEpochDay(); } }; } private static DeserializationRuntimeConverter convertToTime() { return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { if (dbzObj instanceof Long) { switch (schema.name()) { case MicroTime.SCHEMA_NAME:return (int) ((long) dbzObj / 1000); case NanoTime.SCHEMA_NAME:return (int) ((long) dbzObj / 1000_000); } } else if (dbzObj instanceof Integer) { return dbzObj; } // get number of milliseconds of the day return TemporalConversions.toLocalTime(dbzObj).toSecondOfDay() * 1000; } }; } private static DeserializationRuntimeConverter convertToTimestamp(ZoneId serverTimeZone) { return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { if (dbzObj instanceof Long) { switch (schema.name()) { case Timestamp.SCHEMA_NAME:return TimestampData.fromEpochMillis((Long) dbzObj); case MicroTimestamp.SCHEMA_NAME:long micro = (long) dbzObj;return TimestampData.fromEpochMillis( micro / 1000, (int) (micro % 1000 * 1000)); case NanoTimestamp.SCHEMA_NAME:long nano = (long) dbzObj;return TimestampData.fromEpochMillis( nano / 1000_000, (int) (nano % 1000_000)); } } LocalDateTime localDateTime = TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone); return TimestampData.fromLocalDateTime(localDateTime); } }; } private static DeserializationRuntimeConverter convertToLocalTimeZoneTimestamp( ZoneId serverTimeZone) { return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { if (dbzObj instanceof String) { String str = (String) dbzObj; // TIMESTAMP_LTZ type is encoded in string type Instant instant = Instant.parse(str); return TimestampData.fromLocalDateTime(LocalDateTime.ofInstant(instant, serverTimeZone)); } throw new IllegalArgumentException( "Unable to convert to TimestampData from unexpected value '" + dbzObj + "' of type " + dbzObj.getClass().getName()); } }; } private static DeserializationRuntimeConverter convertToString() { return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { return StringData.fromString(dbzObj.toString()); } }; } private static DeserializationRuntimeConverter convertToBinary() { return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { if (dbzObj instanceof byte[]) { return dbzObj; } else if (dbzObj instanceof ByteBuffer) { ByteBuffer byteBuffer = (ByteBuffer) dbzObj; byte[] bytes = new byte[byteBuffer.remaining()]; byteBuffer.get(bytes); return bytes; } else { throw new UnsupportedOperationException("Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName()); } } }; } private static DeserializationRuntimeConverter createRowConverter(RowType rowType) { final DeserializationRuntimeConverter[] fieldConverters = rowType.getFields().stream() .map(RowType.RowField::getType) .map( logicType -> createNotNullConverter( logicType)) .toArray(DeserializationRuntimeConverter[]::new); final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]); return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) throws Exception { Struct struct = (Struct) dbzObj; int arity = fieldNames.length; Row row = new Row(arity); for (int i = 0; i < arity; i++) { String fieldName = fieldNames[i]; Field field = schema.field(fieldName); if (field == null) { row.setField(i, null); } else { Object fieldValue = struct.getWithoutDefault(fieldName); Schema fieldSchema = schema.field(fieldName).schema(); Object convertedField = convertField(fieldConverters[i], fieldValue, fieldSchema); row.setField(i, convertedField); } } return row; } }; } private static Object convertField( DeserializationRuntimeConverter fieldConverter, Object fieldValue, Schema fieldSchema) throws Exception { if (fieldValue == null) { return null; } else { return fieldConverter.convert(fieldValue, fieldSchema); } }}
再贴上我的pom.xml
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.cityos</groupId> <artifactId>flink_1_15</artifactId> <version>1.0-SNAPSHOT</version> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <spring-boot.version>2.3.7.RELEASE</spring-boot.version> <flink.version>1.15.2</flink.version> <scala.binary.version>2.12</scala.binary.version> <!-- <scala.version>2.12.12</scala.version>--> </properties> <repositories> <repository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </repository> <repository> <id>spring</id> <url>https://maven.aliyun.com/repository/spring</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> <!-- flink-connector-kafka --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> </dependency> <!-- flink-connector-jdbc --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc</artifactId> <version>${flink.version}</version> </dependency> <!-- mysql-cdc--> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>2.3.0</version> </dependency> <!-- mysql--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.29</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.21</version> <scope>compile</scope> </dependency> <!-- https://mvnrepository.com/artifact/log4j/log4j --> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring-boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>2.3.7.RELEASE</version> <configuration> <mainClass>com.cityos.Flink1142Application</mainClass> </configuration> <executions> <execution> <id>repackage</id> <goals><goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build></project>
有兴趣的看看,没兴趣的或者感觉不屑的划过就好,莫喷我,代码写的确实是丑。
来源地址:https://blog.csdn.net/qq_36062467/article/details/128117647
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
软考中级精品资料免费领
- 历年真题答案解析
- 备考技巧名师总结
- 高频考点精准押题
- 资料下载
- 历年真题
193.9 KB下载数265
191.63 KB下载数245
143.91 KB下载数1148
183.71 KB下载数642
644.84 KB下载数2756