1. canal 简介
canal 主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
canal 工作原理:
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
官网地址:https://github.com/alibaba/canal
2. canal 服务端
(1)修改 /etc/my.cnf 文件,增加以下配置(PS:改完后别忘了重启数据库 systemctl restart mysqld )
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
备忘录
show variables like "log_bin";
show binary logs;
show master status;
select * from mysql.`user`;
(2)授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY "Canal@123456";
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO "canal"@"%";
-- GRANT ALL PRIVILEGES ON *.* TO "canal"@"%" ;
FLUSH PRIVILEGES;
(3)下载canal,并解压缩
https://github.com/alibaba/canal/releases
(4)修改配置
vim conf/example/instance.properties
启动
sh bin/startup.sh
查看日志
# 查看 server 日志
vi logs/canal/canal.log
# 查看 instance 的日志
vi logs/example/example.log
# 关闭
sh bin/stop.sh
3. canal 客户端
依赖
com.alibaba.otter
canal.client
1.1.5
com.alibaba.otter
canal.protocol
1.1.5
配置
canal.server.host=192.168.28.32
canal.server.port=11111
canal.server.destination=example
canal.server.username=canal
canal.server.password=canal
spring.kafka.bootstrap-servers=192.168.28.32:9092
logback.xml
${log.pattern}
${log.charset}
${log.dir}/my-canal-client.log
${log.dir}/my-canal-client.%d{yyyy-MM-dd}.log
30
3GB
${log.pattern}
测试代码
package com.my.component.canal;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Assert;
import java.util.List;
@Slf4j
public class Demo {
private volatile boolean running = false;
private CanalConnector connector;
private Thread thread = null;
public Demo(CanalConnector connector) {
this.connector = connector;
}
public void start() {
Assert.notNull(connector, "connector is null");
thread = new Thread(this::process);
running = true;
thread.start();
}
public void stop() {
if (!running) {
return;
}
running = false;
if (thread != null) {
try {
thread.join();
} catch (InterruptedException e) {
// ignore
}
}
}
private void process() {
int batchSize = 5 * 1024;
while (running) {
try {
connector.connect();
connector.subscribe(".*\..*");
while (running) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// }
} else {
printEntry(message.getEntries());
}
if (batchId != -1) {
connector.ack(batchId); // 提交确认
}
}
} catch (Throwable e) {
log.error("process error!", e);
try {
Thread.sleep(1000L);
} catch (InterruptedException e1) {
// ignore
}
connector.rollback(); // 处理失败, 回滚数据
} finally {
connector.disconnect();
}
}
}
private void printEntry(List entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private void printColumn(List columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
启动
package com.my.component.canal;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@Slf4j
@Component
public class MyCommandLineRunner implements CommandLineRunner {
@Autowired
private CanalServerProperties canalServerProperties;
@Autowired
private KafkaTemplate kafkaTemplate;
@Override
public void run(String... args) throws Exception {
String destination = "example";
// String ip = AddressUtils.getHostIp();
// String ip = canalServerProperties.getHost();
String ip = "192.168.28.32";
SocketAddress address = new InetSocketAddress(ip, 11111);
CanalConnector connector = CanalConnectors.newSingleConnector(address, destination, "canal", "canal");
Demo demo = new Demo(connector);
demo.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
log.info("## stop the canal client");
demo.stop();
} catch (Throwable e) {
log.warn("##something goes wrong when stopping canal:", e);
} finally {
log.info("## canal client is down.");
}
}));
}
}