引言
监听mysql binlog 大家都知道canal,但是如果是pglog呢,先百度
也就这个靠点谱,文章 没有我想要的demo
去官网看看debezium.io/
其中这个 网址给了demo 但是不能直接用于生产
功能点
首先 pglog binlog监听需要满足哪些功能点
机器宕机,能支持断点续接
进行磁盘持久化
如果监听的表 长时间没有数据变动,delay_size 会变大
//设置心跳时间,就算没有数据 也会保持心跳
props.setProperty("heartbeat.interval.ms", "20000");
for (ChangeEvent<String, String> r : records) {
try {
if (log.isDebugEnabled()) {
log.debug("{}\n{}", r.key(), r.value());
}
if (r.value() != null && r.value().startsWith("{"ts_ms")) {
continue;
}
xxx 具体数据处理
} catch (Exception e) {
log.error("PGLog-binlog param:[{}]", r, e);
}
}
心跳这个是 当时上生产的时候,突然发现没有数据变更的时候 ,有报警,说delay了。。。这顿害怕
大概意思
数据库中有许多更新正在被跟踪,但只有极少数更新与连接器正在为其捕获更改的表和模式相关。这种情况可以通过周期性的心跳事件轻松解决。设置heartbeat.interval.ms连接器配置属性。
由于WAL由所有数据库共享,因此使用的WAL数量趋于增长,直到Debezium为其捕获更改的数据库触发事件。为了克服这一点,有必要:使用heartbeat.interval.ms连接器配置属性启用周期性心跳记录生成。定期从Debezium正在捕捉变化的数据库中发出更改事件。
其中
if (r.value() != null && r.value().startsWith("{"ts_ms")) {
continue;
}
这是因为 如果没有数据来的话,会是ts_ms 开头的,代表,没有新数据
每次binlog传的size 太多,导致服务器处理不过来
props.setProperty("max.BATch.size", "200");
对多个表的监听,应该只有一个流进行监听
props.setProperty("table.include.list", schs.stream().map(BinlogConfig::getSch).map(a -> tables.stream().map(b -> a + "." + b).map(String::valueOf).collect(Collectors.joining(","))).map(String::valueOf).collect(Collectors.joining(",")));
希望磁盘持久化offset,保持数据的正确性
props.setProperty("snapshot.mode", "never");
小工具
查询数据库 offset推迟多少
select pg_replication_slots.*,
pg_current_wal_lsn(),
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_insert_lsn(),
restart_lsn)) as delay_size
from pg_replication_slots;
这个工具在生产已经实践稳定,可以直接使用,有问题 可以评论
代码 github.com/a25017012/y…
以上就是pgsql binlog监听功能点解析的详细内容,更多关于pgsql binlog监听的资料请关注我们其它相关文章!