当前位置: 代码网 > it编程>编程语言>Java > MySQL Binlog 日志监听与 Spring 集成实战场景

MySQL Binlog 日志监听与 Spring 集成实战场景

2024年12月13日 Java 我要评论
mysql binlog 日志监听与 spring 集成实战binlog的三种模式mysql 的二进制日志(binlog)有三种常见的格式:statement 模式、row 模式和mixed 模式。每

mysql binlog 日志监听与 spring 集成实战

binlog的三种模式

mysql 的二进制日志(binlog)有三种常见的格式:statement 模式row 模式mixed 模式。每种模式的设计目标不同,适用于不同的场景,以下是它们的详细对比和应用:

1. statement 模式

statement 模式下,mysql 记录的是每个执行的 sql 语句,而不是具体的数据变化。例如,执行一个 update 语句时,binlog 中记录的是该 sql 语句,而不是更新后的数据。

优点

  • 日志文件小:仅记录 sql 语句,较为轻量。
  • 性能好:对于简单 sql 操作非常高效。

缺点

  • 不确定性:对于非确定性 sql(如包含 rand()now() 等函数的语句),可能导致主从数据不一致。

2. row 模式

row 模式下,mysql 记录每一行数据的变化。如果执行 update 语句,binlog 记录的是被更新的行的具体数据,而非 sql 语句。

优点

  • 精确记录:每一行数据的变更都被完整记录,避免因 sql 语句复杂性导致的不一致问题。
  • 可靠性高:即使是非确定性的操作,也能保证数据一致性。

缺点

  • 日志文件大:每一行变化都要单独记录,可能导致日志文件急剧增大。
  • 性能开销:尤其是大批量数据变更时,性能会受到影响。

3. mixed 模式

mixed 模式结合了 statement 模式row 模式,根据具体 sql 的类型动态选择记录方式。对于简单的 sql 语句(如 insert),mysql 使用 statement 模式;对于复杂的操作或涉及多行数据的 sql 语句,则采用 row 模式

优点

  • 平衡性能与准确性:对于不同的操作选择最合适的记录方式。
  • 灵活性高:在大多数应用场景下,mixed 模式能提供较好的性能与数据一致性。

缺点

  • 配置复杂:需要理解 mysql 如何选择使用不同模式,可能导致配置不当。 如何设置 binlog 格式

可以通过修改 mysql 配置文件来设置 binlog_format 参数,具体操作如下:

[mysqld]
binlog_format=mixed

其中,statementrowmixed 分别代表 statement 模式、row 模式和 mixed 模式。选择适当的 binlog 模式取决于应用的特定需求和性能要求。不同的模式具有不同的优劣势,例如,statement 模式可能会更轻量,而 row 模式可能提供更详细的数据变化信息。

以mixed 为例

查看 binlog 是否开启

你可以通过以下 sql 查询来检查 binlog 是否开启:

show variables like '%log_bin%'

启动springboot程序

新建数据库

这个事件是一个 binlog 事件,其内容表示一个 sql 查询事件。让我解释一下这个事件的各个部分:

  • 事件类型 (***eventtype***): 该事件的类型是 query,表示这是一个 sql 查询事件。
  • 时间戳 (***timestamp***): 事件的时间戳为 1700045267000,表示事件发生的时间。
  • 线程id (***threadid***): 线程id 是 189,表示执行这个查询的线程的标识符。
  • 执行时间 (***executiontime***): 执行时间为 0,表示执行这个查询所花费的时间。
  • 错误代码 (***errorcode***): 错误代码为 0,表示查询执行没有错误。
  • 数据库 (***database***): 数据库为 test2023,表示这个查询发生在 test2023 数据库中。
  • sql 查询 (***sql***): 实际的 sql 查询为 create database test2023 character set utf8 collate utf8_general_ci,表示执行了创建数据库的操作。

这个事件的作用是在 test2023 数据库中执行了一个创建数据库的 sql 查询。这是 binlog 中的一部分,用于记录数据库中的变化,以便进行数据备份、主从同步等操作。

新建表数据

create table `t_user` (
  `id` bigint(20) not null auto_increment,
  `username` varchar(100) not null,
  primary key (`id`)
) engine=innodb auto_increment=10 default charset=utf8mb4;

这个事件也是一个 binlog 事件,表示一个 sql 查询事件。让我解释一下这个事件的各个部分:

  • 事件类型 (***eventtype***): 该事件的类型是 query,表示这是一个 sql 查询事件。
  • 时间戳 (***timestamp***): 事件的时间戳为 1700045422000,表示事件发生的时间。
  • 线程id (***threadid***): 线程id 是 204,表示执行这个查询的线程的标识符。
  • 执行时间 (***executiontime***): 执行时间为 0,表示执行这个查询所花费的时间。
  • 错误代码 (***errorcode***): 错误代码为 0,表示查询执行没有错误。
  • 数据库 (***database***): 数据库为 test2023,表示这个查询发生在 test2023 数据库中。
  • sql 查询 (***sql***): 实际的 sql 查询为 create table t_user(idbigint(20) not null auto_increment,username varchar(100) not null, primary key (id) ) engine=innodb auto_increment=10 default charset=utf8mb4,表示执行了在 test2023 数据库中创建名为 t_user 的表的操作。

这个事件的作用是在 test2023 数据库中创建了一个名为 t_user 的表,该表包含 idusername 两个字段,其中 id 是自增的主键。这种类型的事件常常用于记录数据库结构的变化,以便进行数据备份、迁移和版本控制等操作。

插入表数据

insert into `test2023`.`t_user` (`id`, `username`)
values
    (
        "10086",
        "用心记录技术,走心分享,始于后端,不止于后端,励志成为一名优秀的全栈架构师,真正的实现码中致富。"
    );

这个事件也是一个 binlog 事件,表示一个 sql 查询事件,具体如下:

  • 事件类型 (***eventtype***): 该事件的类型是 query,表示这是一个 sql 查询事件。
  • 时间戳 (***timestamp***): 事件的时间戳为 1700045547000,表示事件发生的时间。
  • 线程id (***threadid***): 线程id 是 204,表示执行这个查询的线程的标识符。
  • 执行时间 (***executiontime***): 执行时间为 0,表示执行这个查询所花费的时间。
  • 错误代码 (***errorcode***): 错误代码为 0,表示查询执行没有错误。
  • 数据库 (***database***): 数据库为 test2023,表示这个查询发生在 test2023 数据库中。
  • sql 查询 (***sql***): 实际的 sql 查询为 insert into test2023.t_user (id, username) values ( "10086", "用心记录技术,走心分享,始于后端,不止于后端,励志成为一名优秀的全栈架构师,真正的实现码中致富。",表示执行了向 test2023 数据库的 t_user 表中插入一行数据的操作。

这个事件的作用是向 t_user 表中插入了一行数据,包含了 idusername 两个字段的值。这种类型的事件通常用于记录数据的变化,以便进行数据备份、同步和迁移等操作。

修改表数据修改表数据

修改表数据

update `test2023`.`t_user`
set `id` = '10086',
 `username` = '我的修改数据!!!'
where
	(`id` = '10086');

这个事件同样是一个 binlog 事件,表示一个 sql 查询事件,具体如下:

  • 事件类型 (***eventtype***): 该事件的类型是 query,表示这是一个 sql 查询事件。
  • 时间戳 (***timestamp***): 事件的时间戳为 1700045675000,表示事件发生的时间。
  • 线程id (***threadid***): 线程id 是 204,表示执行这个查询的线程的标识符。
  • 执行时间 (***executiontime***): 执行时间为 0,表示执行这个查询所花费的时间。
  • 错误代码 (***errorcode***): 错误代码为 0,表示查询执行没有错误。
  • 数据库 (***database***): 数据库为 test2023,表示这个查询发生在 test2023 数据库中。
  • sql 查询 (***sql***): 实际的 sql 查询为 update test2023.t_usersetid= '10086', username = '我的修改数据!!!' where (id = '10086'),表示执行了更新 test2023 数据库中的 t_user 表中一行数据的操作。

这个事件的作用是将 t_user 表中 id10086 的行的数据进行更新,将 id 修改为 10086username 修改为 ‘我的修改数据!!!’。这种类型的事件通常用于记录数据的变化,以便进行数据备份、同步和迁移等操作。

删除表数据

delete
from
	t_user
where
	id = '10086';

这个事件同样是一个 binlog 事件,表示一个 sql 查询事件,具体如下:

  • 事件类型 (***eventtype***): 该事件的类型是 query,表示这是一个 sql 查询事件。
  • 时间戳 (***timestamp***): 事件的时间戳为 1700045755000,表示事件发生的时间。
  • 线程id (***threadid***): 线程id 是 204,表示执行这个查询的线程的标识符。
  • 执行时间 (***executiontime***): 执行时间为 0,表示执行这个查询所花费的时间。
  • 错误代码 (***errorcode***): 错误代码为 0,表示查询执行没有错误。
  • 数据库 (***database***): 数据库为 test2023,表示这个查询发生在 test2023 数据库中。
  • sql 查询 (***sql***): 实际的 sql 查询为 delete from t_user where id = '10086',表示执行了删除 test2023 数据库中的 t_user 表中一行数据的操作。

这个事件的作用是删除 t_user 表中 id10086 的行。这种类型的事件通常用于记录数据的删除操作,以便进行数据备份、同步和迁移等操作。

总结: binlog_format 设置为 mixed 时,对于 insert、update 和 delete 操作,它们在 binlog 中的事件类型都会被表示为 query 事件。这是因为在 mixed 模式下,mysql 使用了不同的方式来记录不同类型的操作,但在 binlog 中,它们都被包装成了 query 事件。

在 mixed 模式下:

  • 对于某些语句级别的操作(例如非确定性的语句或不支持事务的存储引擎),会使用 statement 事件。
  • 对于其他一些情况,会使用 row 事件,将变更的行作为事件的一部分进行记录。

这就是为什么看到的 insert、update 和 delete 操作的事件类型都是 query。在处理这些事件时,需要根据具体的 sql 查询语句或其他信息来确定操作的类型。

源码示例

pom.xml

<dependency>
			<groupid>org.springframework.boot</groupid>
			<artifactid>spring-boot-starter-data-jpa</artifactid>
		</dependency>
		<dependency>
			<groupid>org.springframework.boot</groupid>
			<artifactid>spring-boot-starter-web</artifactid>
		</dependency>
		<dependency>
			<groupid>org.springframework.boot</groupid>
			<artifactid>spring-boot-starter-test</artifactid>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupid>mysql</groupid>
			<artifactid>mysql-connector-java</artifactid>
			<version>5.1.48</version> <!-- 查看最新版本 -->
		</dependency>
		<dependency>
			<groupid>com.github.shyiko</groupid>
			<artifactid>mysql-binlog-connector-java</artifactid>
			<version>0.21.0</version>
		</dependency>

java示例

package com.example.demo.listener;
import com.github.shyiko.mysql.binlog.binarylogclient;
import com.github.shyiko.mysql.binlog.event.event;
import com.github.shyiko.mysql.binlog.event.eventdata;
import com.github.shyiko.mysql.binlog.event.queryeventdata;
import com.github.shyiko.mysql.binlog.event.tablemapeventdata;
import com.github.shyiko.mysql.binlog.event.writerowseventdata;
import com.github.shyiko.mysql.binlog.event.deserialization.eventdeserializer;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import javax.naming.authenticationexception;
import java.io.ioexception;
import java.io.serializable;
import java.util.list;
import java.util.concurrent.timeoutexception;
public class binloglistenermixed {
    private static final logger logger = loggerfactory.getlogger(binloglistenermixed.class);
    private static final string mysql_host = "8.130.74.105";
    private static final int mysql_port = 3306;
    private static final string mysql_username = "root";
    private static final string mysql_password = "zhang.ting.123";
    public static void main(string[] args) {
        try {
            binarylogclient client = new binarylogclient(mysql_host, mysql_port, mysql_username, mysql_password);
//            client.setbinlogfilename(null);
//            client.setbinlogposition(-1); // 或者设置为其他适当的初始位置
//            client.setserverid(1);
//            client.setbinlogfilename("mysql-bin.000005");
//            client.setbinlogposition(154);
            eventdeserializer eventdeserializer = new eventdeserializer();
            eventdeserializer.setcompatibilitymode(
                    eventdeserializer.compatibilitymode.date_and_time_as_long,
                    eventdeserializer.compatibilitymode.char_and_binary_as_byte_array
            );
            logger.info("使用主机={}, 端口={}, 用户名={}, 密码={} 连接到 mysql", mysql_host, mysql_port, mysql_username, mysql_password);
            client.seteventdeserializer(eventdeserializer);
            client.registereventlistener(binloglistenermixed::handleevent);
            client.registerlifecyclelistener(new binarylogclient.lifecyclelistener() {
                @override
                public void onconnect(binarylogclient client) {
                    logger.info("connected to mysql server");
                }
                @override
                public void oncommunicationfailure(binarylogclient client, exception ex) {
                    logger.error("communication failure with mysql server", ex);
                }
                @override
                public void oneventdeserializationfailure(binarylogclient client, exception ex) {
                    logger.error("event deserialization failure", ex);
                }
                @override
                public void ondisconnect(binarylogclient client) {
                    logger.warn("disconnected from mysql server");
                    // 在这里添加重新连接或其他处理逻辑
                }
            });
            client.connect();
        } catch (ioexception e) {
            logger.error("@@ 连接到 mysql 时发生错误", e);
            logger.error("@@ error connecting to mysql", e);
        }
    }
    private static void handleevent(event event) {
        logger.info("@@ 打印 event: {}", event);
        logger.info("@@ received event type: {}", event.getheader().geteventtype());
        switch (event.getheader().geteventtype()) {
            case write_rows:
            case ext_write_rows:
                handlewriterowsevent((writerowseventdata) event.getdata());
                break;
            case query:
                handlequeryevent((queryeventdata) event.getdata());
                break;
            case table_map:
                handletablemapevent((tablemapeventdata) event.getdata());
                break;
            // 其他事件处理...
        }
    }
    private static void handlewriterowsevent(writerowseventdata eventdata) {
        list<serializable[]> rows = eventdata.getrows();
        // 获取表名
        string tablename = gettablename(eventdata);
        // 处理每一行数据
        for (serializable[] row : rows) {
            // 根据需要调整以下代码以获取具体的列值
            string column1value = row[0].tostring();
            string column2value = row[1].tostring();
            // 将数据备份到另一个数据库
            backuptoanotherdatabase(tablename, column1value, column2value);
        }
    }
    private static void handlequeryevent(queryeventdata eventdata) {
        string sql = eventdata.getsql();
        logger.info("@@ handlequeryevent函数执行query event sql: {}", sql);
        // 解析sql语句,根据需要处理
        // 例如,检查是否包含写入操作,然后执行相应的逻辑
    }
    private static void handletablemapevent(tablemapeventdata eventdata) {
        // 获取表映射信息,根据需要处理
        logger.info("@@ handletablemapevent函数执行tablemap event: {}", eventdata);
    }
    private static string gettablename(eventdata eventdata) {
        // 获取表名的逻辑,可以使用tablemapeventdata等信息
        // 根据实际情况实现
        return "example_table";
    }
    private static void backuptoanotherdatabase(string tablename, string column1value, string column2value) {
        // 将数据备份到另一个数据库的逻辑
        logger.info("backup to another database: table={}, column1={}, column2={}", tablename, column1value, column2value);
    }
}

总结

选择合适的 binlog 模式对数据库的性能和数据一致性至关重要:

  • statement 模式适用于简单操作,能节省存储空间,但可能导致不一致。
  • row 模式能精确记录数据变化,适合对数据一致性要求较高的场景。
  • mixed 模式平衡了性能与准确性,适用于大多数应用场景。

到此这篇关于mysql binlog 日志监听与 spring 集成实战的文章就介绍到这了,更多相关mysql binlog 日志监听内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

  • 基于Maven pom文件使用分析

    基于Maven pom文件使用分析

    projectmaven 是一个强大的构建和依赖管理工具,pom.xml 文件是 maven 项目的核心配置文件,用于定义项目的构建、依赖关系、插件、目标等。它... [阅读全文]
  • Java8 CompletableFuture异步编程解读

    Java8 CompletableFuture异步编程解读

    completablefuturede介绍java 8 引入了 completablefuture 类,这是 java 异步编程的一个重要进展。completa... [阅读全文]
  • java集合超详细(最新推荐)

    1 - 概述所有的集合类和集合接口都在java.util包下。在内存中申请一块空间用来存储数据,在java中集合就是替换掉定长的数组的一种引用数据类型。2 - 集合与数组的区别长度…

    2024年12月14日 编程语言
  • Mybatis中SQL的执行过程详解

    Mybatis中SQL的执行过程详解

    mybatis 框架sql执行过程数据库操作映射方式mybatis支持两种方式进行数据库操作映射:映射文件:通过xml文件来定义sql语句和映射关系注解方式:通... [阅读全文]
  • IDEA性能优化方法解决卡顿问题

    IDEA性能优化方法解决卡顿问题

    前言我们在使用 intellij idea的时候有时候会觉得卡顿,那么如何在不升级电脑配置的情况下让 intellij idea的性能更好呢?我们可以通过去修改... [阅读全文]
  • java 抽象类示例详解

    一、抽象类概述:我们知道,类用来模拟现实事物。一个类可以模拟一类事物,而某个类的一个实例化对象可以模拟某个属于该类的具体的事物。类中描绘了该类所有对象共同的特性,当一个类中给出的信…

    2024年12月14日 编程语言

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com