文章详情

短信预约信息系统项目管理师 报名、考试、查分时间动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

canal

2019-03-08 16:41

关注

canal

1.  canal 简介

canal 主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

canal 工作原理:

官网地址:https://github.com/alibaba/canal

2.  canal 服务端

 (1)修改 /etc/my.cnf 文件,增加以下配置(PS:改完后别忘了重启数据库 systemctl restart mysqld )

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

备忘录

show variables like "log_bin";
show binary logs;
show master status;
select * from mysql.`user`;

(2)授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY "Canal@123456";  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO "canal"@"%";
-- GRANT ALL PRIVILEGES ON *.* TO "canal"@"%" ;
FLUSH PRIVILEGES;

 (3)下载canal,并解压缩

https://github.com/alibaba/canal/releases

(4)修改配置

vim conf/example/instance.properties

启动

sh bin/startup.sh

查看日志

# 查看 server 日志
vi logs/canal/canal.log

# 查看 instance 的日志
vi logs/example/example.log

# 关闭
sh bin/stop.sh

3. canal 客户端

依赖


    com.alibaba.otter
    canal.client
    1.1.5


    com.alibaba.otter
    canal.protocol
    1.1.5

配置

canal.server.host=192.168.28.32
canal.server.port=11111
canal.server.destination=example
canal.server.username=canal
canal.server.password=canal

spring.kafka.bootstrap-servers=192.168.28.32:9092

logback.xml



    
    
    

    
    
        
            ${log.pattern}
            ${log.charset}
        
    
    
        ${log.dir}/my-canal-client.log
        
            ${log.dir}/my-canal-client.%d{yyyy-MM-dd}.log
            30
            3GB
        
        
            ${log.pattern}
        
    

    
        
        
    

测试代码

package com.my.component.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Assert;

import java.util.List;


@Slf4j
public class Demo {

    private volatile boolean running = false;
    private CanalConnector connector;
    private Thread thread = null;

    public Demo(CanalConnector connector) {
        this.connector = connector;
    }

    public void start() {
        Assert.notNull(connector, "connector is null");
        thread = new Thread(this::process);
        running = true;
        thread.start();
    }

    public void stop() {
        if (!running) {
            return;
        }
        running = false;
        if (thread != null) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                // ignore
            }
        }
    }

    private void process() {
        int batchSize = 5 * 1024;
        while (running) {
            try {
                connector.connect();
                connector.subscribe(".*\..*");
                while (running) {
                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        // try {
                        // Thread.sleep(1000);
                        // } catch (InterruptedException e) {
                        // }
                    } else {
                        printEntry(message.getEntries());
                    }

                    if (batchId != -1) {
                        connector.ack(batchId); // 提交确认
                    }
                }
            } catch (Throwable e) {
                log.error("process error!", e);
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e1) {
                    // ignore
                }

                connector.rollback(); // 处理失败, 回滚数据
            } finally {
                connector.disconnect();
            }
        }
    }

    private void printEntry(List entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }

            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private void printColumn(List columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

启动

package com.my.component.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.net.SocketAddress;


@Slf4j
@Component
public class MyCommandLineRunner implements CommandLineRunner {

    @Autowired
    private CanalServerProperties canalServerProperties;
    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Override
    public void run(String... args) throws Exception {
        String destination = "example";
//        String ip = AddressUtils.getHostIp();
//        String ip = canalServerProperties.getHost();
        String ip = "192.168.28.32";
        SocketAddress address = new InetSocketAddress(ip, 11111);
        CanalConnector connector = CanalConnectors.newSingleConnector(address, destination, "canal", "canal");

        Demo demo = new Demo(connector);
        demo.start();

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                log.info("## stop the canal client");
                demo.stop();
            } catch (Throwable e) {
                log.warn("##something goes wrong when stopping canal:", e);
            } finally {
                log.info("## canal client is down.");
            }
        }));
    }
}

 

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-数据库
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯