环境说明:
flink 1.15.2
Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production
mysql 版本:5.7
windows11 IDEA 本地运行
先上官网使用说明和案例:Oracle CDC Connector — Flink CDC documentation
Oracle 开启 log archiving
(1).启用 log archiving
a:以DBA用户连接数据库
sqlplus / as sysdba
b:启用 log archiving (会重启数据库)
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
c:检查 log archiving 是否开启 -- Should now "Database log mode: Archive Mode"
archive log list;(2).注:必须为捕获的表或数据库启用补充日志记录,以便数据更改能够捕获已更改数据库行的之前状态。下面演示了如何在表/数据库级别上配置它。
为一个特定的表启用补充日志记录:修改表目录。客户添加补充日志数据(所有)列;
ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;—为数据库启用补充日志修改数据库添加补充日志数据;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;(3).创建具有权限的Oracle用户
a:创建表空间
sqlplus / as sysdba
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
exit;
b:创建用户并赋权 flinkuser flinkpw
sqlplus / as sysdba
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
GRANT CREATE SESSION TO flinkuser;
GRANT SET CONTAINER TO flinkuser;
GRANT SELECT ON V_$DATABASE to flinkuser;
GRANT FLASHBACK ANY TABLE TO flinkuser;
GRANT SELECT ANY TABLE TO flinkuser;
GRANT SELECT_CATALOG_ROLE TO flinkuser;
GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
GRANT SELECT ANY TRANSACTION TO flinkuser;
GRANT LOGMINING TO flinkuser;
GRANT CREATE TABLE TO flinkuser;
GRANT LOCK ANY TABLE TO flinkuser;
GRANT ALTER ANY TABLE TO flinkuser;
GRANT CREATE SEQUENCE TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
GRANT SELECT ON V_$LOG TO flinkuser;
GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
GRANT SELECT ON V_$LOGFILE TO flinkuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
exit;
Oracle 建表,并配置补充日志
CREATE TABLE "USER_INFO" (
ID NUMBER,
USERNAME VARCHAR2(255),
PASSWORD VARCHAR2(255),
PRIMARY KEY (ID));ALTER TABLE USER_INFO ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
Mysql 建表
CREATE TABLE user_new (
id int(11) NOT NULL,
username varchar(255) DEFAULT NULL,
password varchar(255) DEFAULT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
Maven依赖
8 8 1.15.2 org.apache.flink flink-clients ${flink.version} org.apache.flink flink-streaming-java ${flink.version} org.apache.flink flink-runtime-web ${flink.version} org.apache.flink flink-table-planner_2.12 ${flink.version} org.apache.flink flink-connector-jdbc ${flink.version} mysql mysql-connector-java 8.0.29 org.projectlombok lombok 1.18.22 com.ververica flink-sql-connector-mysql-cdc 2.3.0 org.apache.flink flink-connector-jdbc 1.15.2 org.apache.flink flink-connector-base ${flink.version} com.ververica flink-sql-connector-oracle-cdc 2.3.0 org.apache.logging.log4j log4j-slf4j-impl 2.12.1 org.slf4j slf4j-simple 1.7.15 org.apache.logging.log4j log4j-core 2.17.2 org.apache.logging.log4j log4j-api 2.17.2 log4j log4j 1.2.9
demo如下:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class OracleCdcToMysql { public static void main(String[] args) { //1.获取stream的执行环境 StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); senv.setParallelism(1); //2.创建表执行环境 StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv); String sourceTable = "CREATE TABLE oracle_cdc_source " + "( ID INT, " + "USERNAME STRING, " + "PASSWORD STRING, " + "PRIMARY KEY(ID) NOT ENFORCED) WITH (\n" + "'connector' = 'oracle-cdc',\n" + "'hostname' = '1.1.1.1',\n" + "'port' = '1521',\n" + "'username' = 'flinkcdcuser',\n" + "'password' = 'flinkpw',\n" + "'database-name' = 'LMDB',\n" +//select name from v$database; "'schema-name' = 'TEST',\n" +//select SYS_CONTEXT('USERENV','CURRENT_SCHEMA') CURRENT_SCHEMA from dual; "'debezium.snapshot.mode' = 'schema_only',\n" + //snapshot.mode = initial 快照包括捕获表的结构和数据。指定此值将用捕获表中数据的完整表示填充主题。 //snapshot.mode = schema_only 快照只包含捕获表的结构。如果希望连接器仅捕获快照之后发生的更改的数据,请指定此值。 "'scan.incremental.snapshot.enabled' = 'true',\n" + //scan.incremental.snapshot.enabled 增量快照是一种读取表快照的新机制。增量快照与旧的快照机制相比有很多优点,包括: // (1)在快照读取期间源可以并行;(2)在快照读取期间源可以在块粒度上执行检查点;(3)在快照读取之前源不需要获取ROW SHARE MODE锁。 "'scan.incremental.snapshot.chunk.size' = '8096' ,\n" + //表快照的块大小(行数),当读取表快照时,捕获的表被分割成多个块。 "'scan.snapshot.fetch.size' = '1024',\n" + //读取表快照时每个轮询的最大读取大小。 "'connect.max-retries' = '3',\n" + //连接器应该重试构建Oracle数据库服务器连接的最大重试次数。 "'connection.pool.size'= '20',\n" + //连接池大小 "'debezium.log.mining.strategy' = 'online_catalog',\n" + //online_catalog -使用数据库的当前数据字典来解析对象id,并且不向在线重做日志中写入任何额外的信息。 // 这使得LogMiner的挖掘速度大大提高,但代价是无法跟踪DDL的变化。如果捕获的表模式很少或从不更改,那么这是理想的选择。 "'debezium.log.mining.archive.destination.name' = 'log_archive_dest_1',\n" + "'debezium.log.mining.continuous.mine'='true'," + " 'table-name' = 'USER_INFO'\n" + ")"; tEnv.executeSql(sourceTable);// tEnv.executeSql("select * from oracle_cdc_source").print(); //加上打印后,虽然可以实时看到增删改查记录,但是这些后续操作并不会插入到目标表。如果不加这句打印,则程序无问题 String sinkTable = "CREATE TABLE mysql_cdc_sink (" + " ID INT,\n" + " USERNAME STRING,\n" + " PASSWORD STRING,\n" + "PRIMARY KEY(ID) NOT ENFORCED\n" + ") WITH (\n" + "'connector' = 'jdbc',\n" + "'driver' = 'com.mysql.cj.jdbc.Driver',\n" + "'url' = 'jdbc:mysql://localhost:3306/test?rewriteBatchedStatements=true',\n" + "'username' = 'flink_cdc_user',\n" + "'password' = 'flink@cdc',\n"+ " 'table-name' = 'user_new',\n" + " 'connection.max-retry-timeout' = '60s'\n" + ")"; tEnv.executeSql(sinkTable); tEnv.executeSql("insert into mysql_cdc_sink select ID,USERNAME,PASSWORD from oracle_cdc_source"); }}
本地运行控制台是不会输出什么提示的,不像mysql cdc 还可以看到一些查看binlog日志信息。你可以知道程序运行成功与否,Oracle的什么都不会输出。
下图是有打印的,但是只能打印,后续插表动作就失效了。如果不打印,那就是什么都没有。
下图是mysqlCDC的,可以看到有连接,有读取binlog日志,并且还可以打印,后续插表也正常。
具体对应数据类型,还需查看官网,最下面有列出所有对应的数据类型。
具体可用参数,可查官网,也可查阿里介绍,毕竟这是阿里大大的。感觉阿里大大的参数类型更全,更多。具体如何使用,还需研究。MySQL_实时计算 Flink版-阿里云帮助中心
打包到集群运行--后续再补一篇吧,前面几篇都需要。单独补一篇。
来源地址:https://blog.csdn.net/qq_41875667/article/details/131390725