文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Java 解析binlog实现数据同步增量等操作

2023-09-14 10:13

关注

使用mysql-binlog-connector-java进行解析

解析row格式的binlog(mysql默认的是Statement格式需要手动调整到row格式)

如果使用Statement模式的话只能得到执行的sql

binlog有三种格式:Statement, Row和Mixed.

基于SQL语句的复制(statement-based replication, SBR) 基于行的复制(row-based
replication, RBR) 混合模式复制(mixed-based replication, MBR)
(1)Statement
每一条会修改数据的sql都会记录在binlog中。

优点:不需要记录每一行的变化,减少了binlog日志量,节约了IO, 提高了性能。

缺点:由于记录的只是执行语句,为了这些语句能在slave上正确运行,因此还必须记录每条语句在执行的时候的一些相关信息,以保证所有语句能在slave得到和在master端执行的时候相同的结果。另外mysql的复制,像一些特定函数的功能,slave可与master上要保持一致会有很多相关问题。

相比row能节约多少性能与日志量,这个取决于应用的SQL情况,正常同一条记录修改或者插入row格式所产生的日志量还小鱼statement产生的日志量,但是考虑到如果带条件的update操作,以及整表删除,alter表等操作,row格式会产生大量日志,因此在考虑是否使用row格式日志时应该根据应用的实际情况,其所产生的日志量会增加多少,以及带来的IO性能问题。

(2)Row
5.1.5版本的MySQL才开始支持row level的复制,它不记录sql语句上下文相关信息,仅保存哪条记录被修改。

优点:
binlog中可以不记录执行的sql语句的上下文相关的信息,仅需要记录那一条记录被修改成什么了。所以row的日志内容会非常清楚的记录下每一行数据修改的细节。而且不会出现某些特定情况下的存储过程,或function,以及trigger的调用和触发无法被正确复制的问题.

缺点:所有的执行的语句当记录到日志中的时候,都将以每行记录的修改来记录,这样可能会产生大量的日志内容。

新版本的MySQL中对row level模式也被做了优化,并不是所有的修改都会以row
level来记录,像遇到表结构变更的时候就会以statement模式来记录,如果sql语句确实就是update或者delete等修改数据的语句,那么还是会记录所有行的变更。

(3)Mixed 从5.1.8版本开始,MySQL提供了Mixed格式,实际上就是Statement与Row的结合。
在Mixed模式下,一般的语句修改使用statment格式保存binlog,如一些函数,statement无法完成主从复制的操作,则采用row格式保存binlog,MySQL会根据执行的每一条具体的sql语句来区分对待记录的日志形式,也就是在Statement和Row之间选择一种。

查看binlog模式

show global variables like%binlog_format%;

显示主服务器使用的二进制文件及大小

show master logs;

运⾏时在线修改临时格式(可以去配置文件修改)

SET GLOBAL binlog_format = 'ROW';
Binlog事件类型binlog事件类型一共有三个版本:v1: Used in MySQL 3.23v3: Used in MySQL 4.0.2 though 4.1v4: Used in MySQL 5.0 and up注:v2出现了很短的时间,并且已经不被支持现在所使用的MySQL一般都是5.5起了,所以下面陈述的都是v4版的binlog事件类型。binlog的事件类型一共有以下几种:enum Log_event_type {   UNKNOWN_EVENT= 0,   START_EVENT_V3= 1,   QUERY_EVENT= 2,   STOP_EVENT= 3,   ROTATE_EVENT= 4,   INTVAR_EVENT= 5,   LOAD_EVENT= 6,   SLAVE_EVENT= 7,   CREATE_FILE_EVENT= 8,   APPEND_BLOCK_EVENT= 9,   EXEC_LOAD_EVENT= 10,   DELETE_FILE_EVENT= 11,   NEW_LOAD_EVENT= 12,   RAND_EVENT= 13,   USER_VAR_EVENT= 14,   FORMAT_DESCRIPTION_EVENT= 15,   XID_EVENT= 16,   BEGIN_LOAD_QUERY_EVENT= 17,   EXECUTE_LOAD_QUERY_EVENT= 18,   TABLE_MAP_EVENT = 19,   PRE_GA_WRITE_ROWS_EVENT = 20,   PRE_GA_UPDATE_ROWS_EVENT = 21,   PRE_GA_DELETE_ROWS_EVENT = 22,   WRITE_ROWS_EVENT = 23,   UPDATE_ROWS_EVENT = 24,   DELETE_ROWS_EVENT = 25,   INCIDENT_EVENT= 26,   HEARTBEAT_LOG_EVENT= 27,   IGNORABLE_LOG_EVENT= 28,  ROWS_QUERY_LOG_EVENT= 29,  WRITE_ROWS_EVENT_V2 = 30,  UPDATE_ROWS_EVENT_V2 = 31,  DELETE_ROWS_EVENT_V2 = 32,  GTID_LOG_EVENT= 33,  ANONYMOUS_GTID_LOG_EVENT= 34,  PREVIOUS_GTIDS_LOG_EVENT= 35,   ENUM_END_EVENT    };

准备工作 pom引入

<dependency>    <groupId>org.springframework.cloud</groupId>    <artifactId>spring-cloud-starter-bootstrap</artifactId>    <version>4.0.1</version></dependency><dependency>    <groupId>com.alibaba.nacos</groupId>    <artifactId>nacos-spring-context</artifactId>    <version>0.3.6</version></dependency><dependency>    <groupId>com.github.shyiko</groupId>    <artifactId>mysql-binlog-connector-java</artifactId>    <version>0.21.0</version></dependency>

配置文件(nacos)

binlog:  host: 10.10.10.115  port: 3306  username: root  passwd: 123456#监听数据库  db: function#监听表结构  table: function

代码

import org.springframework.beans.BeansException;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;import org.springframework.stereotype.Component;@Componentpublic class SpringContextUtil implements ApplicationContextAware {    private static ApplicationContext applicationContext;    @Override    public void setApplicationContext(ApplicationContext context) throws BeansException {        applicationContext = context;    }        public static ApplicationContext getApplicationContext() {        return applicationContext;    }        public static <T> T getBean(Class<T> requiredType) {        return getApplicationContext().getBean(requiredType);    }        public static <T> T getBean(String name) {        return (T) getApplicationContext().getBean(name);    }}
import com.alibaba.nacos.api.config.annotation.NacosValue;import com.alibaba.nacos.spring.context.annotation.config.NacosPropertySource;import lombok.Data;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.cloud.context.config.annotation.RefreshScope;import org.springframework.context.annotation.Configuration;import org.springframework.stereotype.Component;@Data@RefreshScope@Configuration@Component@ConfigurationProperties(prefix = "binlog")@NacosPropertySource(dataId = "dd-function-api", autoRefreshed = true)public class BinLogConstants {    @NacosValue(value = "${host:127.0.0.1}", autoRefreshed = true)    private String host;    @NacosValue(value = "${port:3306}", autoRefreshed = true)    private int port;    @NacosValue(value = "${username:root}", autoRefreshed = true)    private String username;    @NacosValue(value = "${passwd:123456}", autoRefreshed = true)    private String passwd;    @NacosValue(value = "${db:root}", autoRefreshed = true)    private String db;    @NacosValue(value = "${table:root}", autoRefreshed = true)    private String table;    public static final int consumerThreads = 4;    public static final long queueSleep = 1000;}
import lombok.Data;@Datapublic class Colum {  public int inx;  public String colName; // 列名  public String dataType; // 类型  public String schema; // 数据库  public String table; // 表  public Colum(String schema, String table, int idx, String colName, String dataType) {    this.schema = schema;    this.table = table;    this.colName = colName;    this.dataType = dataType;    this.inx = idx;  }}
import com.github.shyiko.mysql.binlog.event.EventType;import com.google.api.client.util.Maps;import lombok.Data;import java.io.Serializable;import java.util.Map;import static com.github.shyiko.mysql.binlog.event.EventType.isDelete;import static com.github.shyiko.mysql.binlog.event.EventType.isWrite;@Datapublic class BinLogItem implements Serializable {    private static final long serialVersionUID = 5503152746318421290L;    private String dbTable;    private EventType eventType;    private Long timestamp = null;    private Long serverId = null;    // 存储字段-之前的值之后的值    private Map<String, Serializable> before = null;    private Map<String, Serializable> after = null;    // 存储字段--类型    private Map<String, Colum> colums = null;        public static BinLogItem itemFromInsertOrDeleted(Serializable[] row, String dbTable, Map<String, Colum> columMap, EventType eventType) {        if (null == row || null == columMap) {            return null;        }        if (row.length != columMap.size()) {            //TODO 根据dbTable重新拉取一下结构            return null;        }        // 初始化Item        BinLogItem item = new BinLogItem();        item.eventType = eventType;        item.colums = columMap;        item.before = Maps.newHashMap();        item.after = Maps.newHashMap();        Map<String, Serializable> beOrAf = Maps.newHashMap();        columMap.forEach((key, colum) -> {            Serializable serializable = row[colum.inx];            if (serializable instanceof byte[]) {                beOrAf.put(key, new String((byte[]) serializable));            } else {                beOrAf.put(key, serializable);            }        });        // 写操作放after,删操作放before        if (isWrite(eventType)) {            item.after = beOrAf;        }        if (isDelete(eventType)) {            item.before = beOrAf;        }        return item;    }        public static BinLogItem itemFromUpdate(Map.Entry<Serializable[], Serializable[]> mapEntry, Map<String, Colum> columMap, EventType eventType) {        if (null == mapEntry || null == columMap) {            return null;        }        // 初始化Item        BinLogItem item = new BinLogItem();        item.eventType = eventType;        item.colums = columMap;        item.before = Maps.newHashMap();        item.after = Maps.newHashMap();        Map<String, Serializable> be = Maps.newHashMap();        Map<String, Serializable> af = Maps.newHashMap();        columMap.forEach((key, colum) -> {            Serializable serializableKey = mapEntry.getKey()[colum.inx];            Serializable serializableValue = mapEntry.getValue()[colum.inx];            if (serializableKey instanceof byte[]) {                be.put(key, new String((byte[]) serializableKey));            } else {                be.put(key, serializableKey);            }            if (serializableValue instanceof byte[]) {                af.put(key, new String((byte[]) serializableValue));            } else {                af.put(key, serializableValue);            }        });        item.before = be;        item.after = af;        return item;    }}
import cn.hutool.core.date.DateUtil;import cn.hutool.core.util.StrUtil;import com.energy.cloud.function.config.SpringContextUtil;import com.github.shyiko.mysql.binlog.event.EventType;import com.google.api.client.util.Lists;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import javax.sql.DataSource;import java.io.Serializable;import java.sql.*;import java.util.Arrays;import java.util.HashMap;import java.util.List;import java.util.Map;import static com.github.shyiko.mysql.binlog.event.EventType.*;@Slf4j@Componentpublic class BinLogUtils {    private static BinLogUtils binLogUtils;    @PostConstruct    public void init() {        binLogUtils = this;//    binLogUtils.searchStoreLogoExtMapper = this.searchStoreLogoExtMapper;    }        public static String getdbTable(String db, String table) {        return db + "-" + table;    }        public static Map<String, Colum> getColMap(String db, String table) {        PreparedStatement ps = null;        ResultSet rs = null;        Connection connection = null;        try {            //获取数据源            DataSource dataSource = SpringContextUtil.getBean(DataSource.class);            connection = dataSource.getConnection();            // 执行sql获取表数据            String preSql = "SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE, ORDINAL_POSITION FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? and TABLE_NAME = ?";            ps = connection.prepareStatement(preSql);            ps.setString(1, db);            ps.setString(2, table);            rs = ps.executeQuery();            Map<String, Colum> map = new HashMap<>(rs.getRow());            while (rs.next()) {                String schema = rs.getString("TABLE_SCHEMA");                String tableName = rs.getString("TABLE_NAME");                String column = rs.getString("COLUMN_NAME");                int idx = rs.getInt("ORDINAL_POSITION");                String dataType = rs.getString("DATA_TYPE");                if (column != null && idx >= 1) {                    map.put(column, new Colum(schema, tableName, idx - 1, column, dataType)); // sql的位置从1开始                }            }            ps.close();            rs.close();            connection.close();            return map;        } catch (SQLException e) {            log.error("load db conf error, db_table={}:{} ", db, table, e);        } finally {            try {                if (ps != null) {                    ps.close();                }                if (rs != null) {                    rs.close();                }                if (connection != null) {                    connection.close();                }            } catch (SQLException e) {                throw new RuntimeException(e);            }        }        return null;    }        public static String getTable(String dbTable) {        if (StrUtil.isEmpty(dbTable)) {            return "";        }        String[] split = dbTable.split("-");        if (split.length == 2) {            return split[1];        }        return "";    }        public static List<String> getListByStr(String str) {        if (StrUtil.isEmpty(str)) {            return Lists.newArrayList();        }        return Arrays.asList(str.split(","));    }        public static Map<String, Serializable> getOptMap(BinLogItem binLogItem) {        // 获取操作类型        EventType eventType = binLogItem.getEventType();        if (isWrite(eventType) || isUpdate(eventType)) {            return binLogItem.getAfter();        }        if (isDelete(eventType)) {            return binLogItem.getBefore();        }        return null;    }        public static Integer getOptType(BinLogItem binLogItem) {        // 获取操作类型        EventType eventType = binLogItem.getEventType();        if (isWrite(eventType)) {            return 1;        }        if (isUpdate(eventType)) {            return 2;        }        if (isDelete(eventType)) {            return 3;        }        return null;    }        public static String getImgUrl(Long storeId) {        if (storeId == null) {            return "";        }        return "";    }        public static Date getDateFormat(Date date) {        if (date == null) {            return null;        }        String dateFormat = "yyyy-MM-dd HH:mm:ss";        String strDate = DateUtil.format(date, dateFormat);        if (StrUtil.isEmpty(strDate)) {            return null;        }        Date formatDate = DateUtil.parse(strDate, dateFormat).toSqlDate();        return formatDate;    }}
@FunctionalInterfacepublic interface BinLogListener {  void onEvent(BinLogItem item);}
import com.github.shyiko.mysql.binlog.BinaryLogClient;import com.github.shyiko.mysql.binlog.event.*;import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;import com.google.common.collect.ArrayListMultimap;import com.google.common.collect.Multimap;import com.google.common.util.concurrent.ThreadFactoryBuilder;import lombok.extern.slf4j.Slf4j;import org.springframework.scheduling.annotation.Async;import java.io.IOException;import java.io.Serializable;import java.util.Map;import java.util.concurrent.*;import static com.energy.cloud.function.binlogsynchronization.BinLogUtils.getColMap;import static com.energy.cloud.function.binlogsynchronization.BinLogUtils.getdbTable;import static com.github.shyiko.mysql.binlog.event.EventType.*;@Slf4jpublic class MysqlBinLogListener implements BinaryLogClient.EventListener {    private int consumerThreads = BinLogConstants.consumerThreads;    private BinaryLogClient parseClient;    private BlockingQueue<BinLogItem> queue;    private final ExecutorService consumer;    // 存放每张数据表对应的listener    private Multimap<String, BinLogListener> listeners;    private Map<String, Map<String, Colum>> dbTableCols;    private Map<Long, String> dbTableIdCols;        public MysqlBinLogListener(BinLogConstants conf) {        BinaryLogClient client = new BinaryLogClient(conf.getHost(), conf.getPort(), conf.getUsername(), conf.getPasswd());        EventDeserializer eventDeserializer = new EventDeserializer();        //序列化        eventDeserializer.setCompatibilityMode(                EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,                EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY        );        client.setEventDeserializer(eventDeserializer);        this.parseClient = client;        this.queue = new ArrayBlockingQueue<>(1024);        this.listeners = ArrayListMultimap.create();        this.dbTableCols = new ConcurrentHashMap<>();        this.dbTableIdCols = new ConcurrentHashMap<>();//自定义线程名        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("thread-BinLogListener-runner-%d").build();        this.consumer = new ThreadPoolExecutor(consumerThreads, consumerThreads,                0L, TimeUnit.MILLISECONDS,                new LinkedBlockingQueue<>(), namedThreadFactory);    }        @Override    public void onEvent(Event event) {        EventType eventType = event.getHeader().getEventType();        if (eventType == EventType.TABLE_MAP) {            TableMapEventData tableData = event.getData();            String db = tableData.getDatabase();            String table = tableData.getTable();            String dbTable = getdbTable(db, table);            if (dbTableCols.containsKey(dbTable)) {                dbTableIdCols.put(tableData.getTableId(), dbTable);            }        }        // 只处理添加删除更新三种操作        if (isWrite(eventType) || isUpdate(eventType) || isDelete(eventType)) {            if (isWrite(eventType)) {                WriteRowsEventData data = event.getData();                for (Serializable[] row : data.getRows()) {                    if (dbTableIdCols.containsKey(data.getTableId())) {                        String dbTable = dbTableIdCols.get(data.getTableId());                        BinLogItem item = BinLogItem.itemFromInsertOrDeleted(row, dbTable, dbTableCols.get(dbTable), eventType);                        item.setDbTable(dbTable);                        queue.add(item);                    }                }            }            if (isUpdate(eventType)) {                UpdateRowsEventData data = event.getData();                for (Map.Entry<Serializable[], Serializable[]> row : data.getRows()) {                    if (dbTableIdCols.containsKey(data.getTableId())) {                        String dbTable = dbTableIdCols.get(data.getTableId());                        BinLogItem item = BinLogItem.itemFromUpdate(row, dbTableCols.get(dbTable), eventType);                        item.setDbTable(dbTable);                        queue.add(item);                    }                }            }            if (isDelete(eventType)) {                DeleteRowsEventData data = event.getData();                for (Serializable[] row : data.getRows()) {                    if (dbTableIdCols.containsKey(data.getTableId())) {                        String dbTable = dbTableIdCols.get(data.getTableId());                        BinLogItem item = BinLogItem.itemFromInsertOrDeleted(row,dbTable, dbTableCols.get(dbTable), eventType);                        item.setDbTable(dbTable);                        queue.add(item);                    }                }            }        }    }        public void regListener(String db, String table, BinLogListener listener) {        String dbTable = getdbTable(db, table);        // 获取字段集合        Map<String, Colum> cols = getColMap(db, table);        // 保存字段信息        dbTableCols.put(dbTable, cols);        // 保存当前注册的listener        listeners.put(dbTable, listener);    }        @Async    public void parse() throws IOException, TimeoutException {        parseClient.registerEventListener(this);        //可以自定义读取文件和位置              //  parseClient.setBinlogFilename(xxx);              //  parseClient.setBinlogPosition(xxxx);        for (int i = 0; i < consumerThreads; i++) {            consumer.execute(() -> {                while (true) {                    if (queue.size() > 0) {                        try {BinLogItem item = queue.take();String dbtable = item.getDbTable();listeners.get(dbtable).forEach(binLogListener -> binLogListener.onEvent(item));                        } catch (InterruptedException e) {e.printStackTrace();                        }                    }                    try {                        Thread.sleep(BinLogConstants.queueSleep);                    } catch (InterruptedException e) {                        throw new RuntimeException(e);                    }                }            });        }        //不设置时间将会使用主线程,没啥影响就是启动时一直转圈        parseClient.connect(BinLogConstants.queueSleep);              //可以设置定时器将现在的文件和位置记录下来,配合上面的代码//Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(() -> {//                log.info("Binlog name:{},position:{}", //parseClient.getBinlogFilename(), parseClient.getBinlogPosition());//            }//        }, 1, 1, TimeUnit.MINUTES);    }}
import cn.hutool.core.collection.CollectionUtil;import lombok.extern.slf4j.Slf4j;import org.springframework.boot.CommandLineRunner;import org.springframework.core.annotation.Order;import org.springframework.stereotype.Component;import javax.annotation.Resource;import java.util.List;@Slf4j@Component@Order(value = 1)public class TourBinLogListener implements CommandLineRunner {  @Resource  private BinLogConstants binLogConstants;  @Override  public void run(String... args) throws Exception {    log.info("初始化配置信息:" + binLogConstants.toString());    // 初始化监听器    MysqlBinLogListener mysqlBinLogListener = new MysqlBinLogListener(binLogConstants);    // 获取table集合    List<String> tableList = BinLogUtils.getListByStr(binLogConstants.getTable());    if (CollectionUtil.isEmpty(tableList)) {      return;    }    // 注册监听    tableList.forEach(table -> {      log.info("注册监听信息,注册DB:" + binLogConstants.getDb() + ",注册表:" + table);      try {        mysqlBinLogListener.regListener(binLogConstants.getDb(), table, item -> {         log.info(item.getAfter().toString());          log.info("监听逻辑处理");        });      } catch (Exception e) {        log.error("BinLog监听异常:" + e);      }    });    // 多线程消费   mysqlBinLogListener.parse();  }}

来源地址:https://blog.csdn.net/ycg_x_y/article/details/129949381

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-数据库
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯