文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

SpringBoot怎么整合Canal方法

2023-07-04 20:37

关注

这篇“SpringBoot怎么整合Canal方法”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“SpringBoot怎么整合Canal方法”文章吧。

pom.xml 添加 canal.client 依赖

(1.1.5 改动很大,这儿客户端用 1.1.4)

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>2.2.2.RELEASE</version>        <relativePath/> <!-- lookup parent from repository -->    </parent>    <groupId>top.yueshushu</groupId>    <artifactId>learn</artifactId>    <version>1.0-SNAPSHOT</version>    <name>Canal</name>    <description>学习 Canal</description>    <properties>        <java.version>1.8</java.version>        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>    </properties>    <dependencies>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-test</artifactId>            <scope>test</scope>        </dependency>        <dependency>            <groupId>org.projectlombok</groupId>            <artifactId>lombok</artifactId>        </dependency>        <!-- 导入配置文件处理器,配置文件进行绑定就会有提示,需要重启 -->        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-configuration-processor</artifactId>            <optional>true</optional>        </dependency>        <!--导入自动热步署的依赖-->        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-devtools</artifactId>            <optional>true</optional>        </dependency>        <!--引入MySql的驱动-->        <dependency>            <groupId>mysql</groupId>            <artifactId>mysql-connector-java</artifactId>        </dependency>        <!--引入springboot与mybatis整合的依赖-->        <dependency>            <groupId>org.mybatis.spring.boot</groupId>            <artifactId>mybatis-spring-boot-starter</artifactId>            <version>2.1.4</version>        </dependency>        <!-- 引入pagehelper分页插件 -->        <dependency>            <groupId>com.github.pagehelper</groupId>            <artifactId>pagehelper-spring-boot-starter</artifactId>            <version>1.2.5</version>        </dependency>        <!--添加 druid-spring-boot-starter的依赖的依赖-->        <dependency>            <groupId>com.alibaba</groupId>            <artifactId>druid-spring-boot-starter</artifactId>            <version>1.1.14</version>        </dependency>        <!--SpringBoot 的aop 模块-->        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-aop</artifactId>        </dependency>        <!--添加canal的依赖. 重要.  使用  1.1.4-->        <dependency>            <groupId>com.alibaba.otter</groupId>            <artifactId>canal.client</artifactId>            <version>1.1.4</version>        </dependency>        <dependency>            <groupId>joda-time</groupId>            <artifactId>joda-time</artifactId>            <version>2.9.4</version>        </dependency>    </dependencies>    <build>        <!--将该目录下的文件全部打包成类的路径-->        <resources>            <resource>                <directory>src/main/resources</directory>            </resource>        </resources>        <plugins>            <plugin>                <groupId>org.springframework.boot</groupId>                <artifactId>spring-boot-maven-plugin</artifactId>            </plugin>        </plugins>    </build></project>

业务功能处理

简单连接程序

    @Test    public void connectionTest() {        //1. 创建连接  填充对应的地址信息 ,要监控的实例和相应的用户名和密码        CanalConnector canalConnector = CanalConnectors.newSingleConnector(                new InetSocketAddress(                        "127.0.0.1", 11111                ),                "example",                "canal",                "canal"        );        //2. 进行连接        canalConnector.connect();        log.info(">>>连接成功:{}", canalConnector);    }

32.179 [main] INFO top.yueshushu.learn.CanalDemoTest - >>>连接成功:com.alibaba.otter.canal.client.impl.SimpleCanalConnector@31ef45e3

单次获取数据

    @Test    public void getDataTest() {        //1. 创建连接        CanalConnector canalConnector = CanalConnectors.newSingleConnector(                new InetSocketAddress("127.0.0.1", 11111),                "example",                "canal",                "canal"        );        // 进行连接        canalConnector.connect();        //3. 注册,看使用哪个数据库表        canalConnector.subscribe("springboot.user");        //4. 获取 1条数据        Message message = canalConnector.get(1);        log.info("获取的数据:id:{},数据:{}", message.getId(), message);        if (message.getId() == -1) {            log.info(">>>未获取到数据");            return;        }        //5. 获取相应的数据集合        List<CanalEntry.Entry> entries = message.getEntries();        for (CanalEntry.Entry entry : entries) {            log.info(">>>获取数据 {}", entry);            //获取表名            CanalEntry.Header header = entry.getHeader();            log.info(">>>获取表名:{}", header.getTableName());            CanalEntry.EntryType entryType = entry.getEntryType();            log.info(">>获取类型 {}:,对应的信息:{}", entryType.getNumber(), entryType.name());            //获取数据            ByteString storeValue = entry.getStoreValue();            log.info(">>>输出存储的值:{}", storeValue);        }    }

SpringBoot怎么整合Canal方法

在主库里面插入一条数据

insert into springboot.user(id,name,age,sex,description) values(1,'canal添加用户',24,'男','学习canal');

再次执行:

SpringBoot怎么整合Canal方法

循环获取数据

    @Test    public void getNowDataTest() {        //1. 创建连接        CanalConnector canalConnector = CanalConnectors.newSingleConnector(                new InetSocketAddress("127.0.0.1", 11111),                "example",                "canal",                "canal"        );        // 进行连接        canalConnector.connect();        //3. 注册,看使用哪个数据库表        canalConnector.subscribe("springboot.user");        for (;;) {            //4. 获取 1条数据            Message message = canalConnector.get(1);            log.info("获取的数据:id:{},数据:{}", message.getId(), message);            if (message.getId() == -1) {                log.info(">>>未获取到数据");                try {                    TimeUnit.MILLISECONDS.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }                continue;            }            //5. 获取相应的数据集合            List<CanalEntry.Entry> entries = message.getEntries();            for (CanalEntry.Entry entry : entries) {                log.info(">>>获取数据 {}", entry);                //获取表名                CanalEntry.Header header = entry.getHeader();                log.info(">>>获取表名:{}", header.getTableName());                CanalEntry.EntryType entryType = entry.getEntryType();                log.info(">>获取类型 {}:,对应的信息:{}", entryType.getNumber(), entryType.name());                //获取数据                ByteString storeValue = entry.getStoreValue();                log.info(">>>输出存储的值:{}", storeValue);            }        }    }

可以随时获取相应的数据变更信息。

会发现, storeValue 的值是很难解读的。 需要将这个数据解析出来。

解析 storeValue 值

    @Test    public void convertDataTest() throws Exception {        //1. 创建连接        CanalConnector canalConnector = CanalConnectors.newSingleConnector(                new InetSocketAddress("127.0.0.1", 11111),                "example",                "canal", "canal"        );        //2. 进行连接        canalConnector.connect();        canalConnector.subscribe("springboot.user");        for (;;) {            //获取信息            Message message = canalConnector.get(1);            if (message.getId() == -1L) {                // log.info("未获取到数据");                try {                    TimeUnit.MILLISECONDS.sleep(100);                } catch (InterruptedException e) {                    e.printStackTrace();                }                continue;            }            List<CanalEntry.Entry> entryList = message.getEntries();            //对获取到的数据进行处理            log.info(">>获取到{}条数据", entryList.size());            for (CanalEntry.Entry entry : entryList) {                CanalEntry.Header header = entry.getHeader();                log.info(">>>获取表名:{}", header.getTableName());                //获取类型.                CanalEntry.EntryType entryType = entry.getEntryType();                log.info(">>类型编号 {},类型名称:{}", entryType.getNumber(), entryType.name());                //获取存入日志的值                ByteString storeValue = entry.getStoreValue();                //将这个值进行解析                CanalEntry.RowChange rowChange = RowChange.parseFrom(storeValue);                String sql = rowChange.getSql();                log.info(">>>获取对应的sql:{}", sql);                // 这个sql 可能是 批量的sql语句                List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();                for (CanalEntry.RowData rowData : rowDatasList) {                    log.info(">>>获取信息:{}", rowData);                    //对数据进行处理                    List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();                    List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();                    beforeColumnsList.forEach(                            n -> log.info("哪个列{},原先是{},是否被更新{}", n.getName(),                                    n.getValue(), n.getUpdated())                    );                    afterColumnsList.forEach(                            n -> log.info("哪个列{},后来是{},是否被更新{}", n.getName(), n.getValue(), n.getUpdated())                    );                }            }        }    }

再次执行sql

insert into springboot.user(id,name,age,sex,description) values(2,'canal添加用户2',25,'男','学习canal2');

SpringBoot怎么整合Canal方法

不同的类型进行不同的处理

发现 其他类型的 如: TRANSACTIONBEGIN 也进行了处理

    @Test    public void dataTypeTest() throws Exception {        CanalConnector canalConnector = CanalConnectors.newSingleConnector(                new InetSocketAddress(                        "127.0.0.1", 11111                ),                "example",                "canal", "canal"        );        canalConnector.connect();        canalConnector.subscribe("springboot.user");        for(;;){            Message message = canalConnector.get(1);            if (message.getId() == -1) {                TimeUnit.SECONDS.sleep(1);                continue;            }            List<CanalEntry.Entry> entries = message.getEntries();            for (CanalEntry.Entry entry : entries) {                CanalEntry.EntryType entryType = entry.getEntryType();                //只要 RowData 数据类型的                if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) {                    continue;                }                String tableName = entry.getHeader().getTableName();                log.info(">>>对表 {} 进行操作", tableName);                ByteString storeValue = entry.getStoreValue();                RowChange rowChange = RowChange.parseFrom(storeValue);                //行改变                CanalEntry.EventType eventType = rowChange.getEventType();                switch (eventType) {                    case INSERT: {                        insertHandler(rowChange);                        break;                    }                    case UPDATE: {                        updateHandler(rowChange);                        break;                    }                    case DELETE: {                        deleteHandler(rowChange);                        break;                    }                    default: {                        break;                    }                }            }        }    }    private void deleteHandler(RowChange rowChange) {        log.info(">>>>执行删除的方法");        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();        for (CanalEntry.RowData rowData : rowDatasList) {            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();            for (CanalEntry.Column column : beforeColumnsList) {                log.info(">>>>>字段 {} 删除数据 {}", column.getName(), column.getValue());            }        }    }    private void updateHandler(RowChange rowChange) {        log.info(">>>执行更新的方法");        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();        for (CanalEntry.RowData rowData : rowDatasList) {            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();            Map<String, String> beforeValueMap = beforeColumnsList.stream().collect(                    Collectors.toMap(                            CanalEntry.Column::getName,                            CanalEntry.Column::getValue                    )            );            Map<String, String> afterValueMap = afterColumnsList.stream().collect(                    Collectors.toMap(                            CanalEntry.Column::getName,                            CanalEntry.Column::getValue                    )            );            beforeValueMap.forEach((column, beforeValue) -> {                String afterValue = afterValueMap.get(column);                Boolean update = beforeValue.equals(afterValue);                log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforeValue, afterValue,                        update);            });        }    }        private void insertHandler(RowChange rowChange) {        log.info(">>>执行添加 的方法");        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();        for (CanalEntry.RowData rowData : rowDatasList) {            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();            for (CanalEntry.Column column : afterColumnsList) {                if (!StringUtils.hasText(column.getValue())) {                    continue;                }                log.info("字段 {} 插入了数据 {}", column.getName(), column.getValue());            }        }    }

插入,更新,删除,分别进行了处理.

先启动测试程序:

SpringBoot怎么整合Canal方法

不打印任何信息。

主表执行添加语句:

insert into springboot.user(id,name,age,sex,description) values(4,'canal添加用户4',25,'男','学习canal4');

会打印信息:

SpringBoot怎么整合Canal方法

这个可读性就非常高了.

主表执行修改的操作.

update springboot.user set name='开开心心',age=26,description='岳泽霖' where id =4;

更新时,若每一个字段都跟原先一样,不会产生日志消费。

SpringBoot怎么整合Canal方法

主表执行删除的操作:

delete from springboot.user where id =4;

SpringBoot怎么整合Canal方法

上面的获取,都是一条数据一条数据获取的。效率比较低

一次性获取多条数据

    @Test    public void dataMoreTest() throws Exception {        //1. 创建 canal连接对象        CanalConnector canalConnector = CanalConnectors.newSingleConnector(                new InetSocketAddress(                        "127.0.0.1", 11111                ),                "example",                "canal",                "canal"        );        canalConnector.connect();        // 订阅哪个对象        canalConnector.subscribe("springboot.user");        for (; ; ) {           // Message message = canalConnector.get(3, 5L, TimeUnit.SECONDS);            Message message = canalConnector.get(3);            if (message.getId() == -1) {                // 未获取到数据                continue;            }            List<CanalEntry.Entry> entries = message.getEntries();            for (CanalEntry.Entry entry : entries) {                CanalEntry.EntryType entryType = entry.getEntryType();                if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) {                    continue;                }                String tableName = entry.getHeader().getTableName();                log.info(">>>>对表{} 执行操作", tableName);                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());                //对类型进行处理                CanalEntry.EventType eventType = rowChange.getEventType();                switch (eventType) {                    case INSERT: {                        insertHandler(rowChange);                        break;                    }                    case UPDATE: {                        updateHandler(rowChange);                        break;                    }                    case DELETE: {                        deleteHandler(rowChange);                        break;                    }                    default: {                        break;                    }                }            }        }    }    private void deleteHandler(CanalEntry.RowChange rowChange) {        log.info(">>>>执行删除的方法");        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();        for (CanalEntry.RowData rowData : rowDatasList) {            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();            for (CanalEntry.Column column : beforeColumnsList) {                log.info(">>>>>字段 {} 删除数据 {}", column.getName(), column.getValue());            }        }    }    private void updateHandler(CanalEntry.RowChange rowChange) {        log.info(">>>执行更新的方法");        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();        for (CanalEntry.RowData rowData : rowDatasList) {            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();            Map<String, String> beforeValueMap = beforeColumnsList.stream().collect(                    Collectors.toMap(                            CanalEntry.Column::getName,                            CanalEntry.Column::getValue                    )            );            Map<String, String> afterValueMap = afterColumnsList.stream().collect(                    Collectors.toMap(                            CanalEntry.Column::getName,                            CanalEntry.Column::getValue                    )            );            beforeValueMap.forEach((column, beforeValue) -> {                String afterValue = afterValueMap.get(column);                Boolean update = beforeValue.equals(afterValue);                log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforeValue, afterValue,                        update);            });        }    }        private void insertHandler(CanalEntry.RowChange rowChange) {        log.info(">>>执行添加 的方法");        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();        for (CanalEntry.RowData rowData : rowDatasList) {            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();            for (CanalEntry.Column column : afterColumnsList) {                if (!StringUtils.hasText(column.getValue())) {                    continue;                }                log.info("字段 {} 插入了数据 {}", column.getName(), column.getValue());            }        }    }

修改点:

// Message message = canalConnector.get(3, 5L, TimeUnit.SECONDS);    Message message = canalConnector.get(3);

.get(3) 表示 一次性获取3条记录.

canalConnector.get(3, 5L, TimeUnit.SECONDS); 表示5秒之内获取3条记录,

有两个触发条件,一个是获取了3条,一个是到了5秒。

效果展示信息与之前是一致的,就不重新演示了。

ack 配置信息

    @Test    public void dataMoreTest() throws Exception {        //1. 创建 canal连接对象        CanalConnector canalConnector = CanalConnectors.newSingleConnector(                new InetSocketAddress(                        "127.0.0.1", 11111                ),                "example",                "canal",                "canal"        );        canalConnector.connect();        // 订阅哪个对象        canalConnector.subscribe("springboot.user");        for (; ; ) {             Message message = canalConnector.getWithoutAck(3, 2L, TimeUnit.SECONDS);            if (message.getId() == -1) {                // 未获取到数据                TimeUnit.MILLISECONDS.sleep(500);                continue;            }            log.info(">>>>获取对应的 id: {}",message.getId());            List<CanalEntry.Entry> entries = message.getEntries();            for (CanalEntry.Entry entry : entries) {                CanalEntry.EntryType entryType = entry.getEntryType();                if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) {                    continue;                }                String tableName = entry.getHeader().getTableName();                log.info(">>>>对表{} 执行操作", tableName);                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());                //对类型进行处理                CanalEntry.EventType eventType = rowChange.getEventType();                switch (eventType) {                    case INSERT: {                        insertHandler(rowChange);                        break;                    }                    case UPDATE: {                        updateHandler(rowChange);                        break;                    }                    case DELETE: {                        deleteHandler(rowChange);                        break;                    }                    default: {                        break;                    }                }            }            //进行回滚           // canalConnector.rollback();            //确认ack 配置           canalConnector.ack(message.getId());        }    }    private void deleteHandler(CanalEntry.RowChange rowChange) {        log.info(">>>>执行删除的方法");        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();        for (CanalEntry.RowData rowData : rowDatasList) {            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();            for (CanalEntry.Column column : beforeColumnsList) {                log.info(">>>>>字段 {} 删除数据 {}", column.getName(), column.getValue());            }        }    }    private void updateHandler(CanalEntry.RowChange rowChange) {        log.info(">>>执行更新的方法");        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();        for (CanalEntry.RowData rowData : rowDatasList) {            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();            Map<String, String> beforeValueMap = beforeColumnsList.stream().collect(                    Collectors.toMap(                            CanalEntry.Column::getName,                            CanalEntry.Column::getValue                    )            );            Map<String, String> afterValueMap = afterColumnsList.stream().collect(                    Collectors.toMap(                            CanalEntry.Column::getName,                            CanalEntry.Column::getValue                    )            );            beforeValueMap.forEach((column, beforeValue) -> {                String afterValue = afterValueMap.get(column);                Boolean update = beforeValue.equals(afterValue);                log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforeValue, afterValue,                        update);            });        }    }        private void insertHandler(CanalEntry.RowChange rowChange) {        log.info(">>>执行添加 的方法");        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();        for (CanalEntry.RowData rowData : rowDatasList) {            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();            for (CanalEntry.Column column : afterColumnsList) {                if (!StringUtils.hasText(column.getValue())) {                    continue;                }                log.info("字段 {} 插入了数据 {}", column.getName(), column.getValue());            }        }    }

主要信息:

Message message = canalConnector.getWithoutAck(3, 2L, TimeUnit.SECONDS);

//进行回滚 // canalConnector.rollback();

//确认ack 配置canalConnector.ack(message.getId());

手动确认消息消费了.

当消息 rollback() 回滚后,会再次消费这条消息.

canalConnector.rollback();

执行语句:

insert into springboot.user(id,name,age,sex,description) values(5,'canal添加用户5',25,'男','学习canal5');

SpringBoot怎么整合Canal方法

如果变成 手动确认,

canalConnector.ack(message.getId());

则只消费一次.

以上就是关于“SpringBoot怎么整合Canal方法”这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注编程网行业资讯频道。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯