当前位置: 代码网 > it编程>数据库>Mysql > MySQL数据变化监听的实现方案

MySQL数据变化监听的实现方案

2025年02月11日 Mysql 我要评论
1. binlog 简介1.1 什么是 binlog?binlog(binary log) 是 mysql 记录 ddl(数据定义语言,如 create、alter)和 dml(数据操作语言,如 in

1. binlog 简介

1.1 什么是 binlog?

binlog(binary log) 是 mysql 记录 ddl(数据定义语言,如 createalter)和 dml(数据操作语言,如 insertupdatedelete)的日志文件,它用于:

  • 主从复制:mysql 主库将 binlog 传输到从库,实现数据同步。
  • 数据恢复:通过 mysqlbinlog 工具解析 binlog 恢复数据。
  • 数据同步:第三方工具(如 canal)解析 binlog,进行数据同步。

1.2 binlog 的三种格式

binlog 格式说明
statement记录 sql 语句本身
row记录行数据变更(推荐)
mixed结合前两者,mysql 自动判断

由于 row 格式能提供精确的行级别变更信息,因此推荐使用它。

2. 开启 binlog 并配置 mysql

2.1 检查 binlog 是否开启

show variables like 'log_bin';

如果 log_bin 值为 off,说明 binlog 未开启。

2.2 修改 mysql 配置文件(my.cnf 或 my.ini)

在 [mysqld] 部分添加以下内容:

server-id=1
log-bin=mysql-bin
binlog-format=row
binlog-row-image=full
expire_logs_days=7

重启 mysql:

systemctl restart mysql  # linux
net stop mysql && net start mysql  # windows

2.3 验证 binlog 配置

执行:

show binary logs;

如果有 binlog 文件,如 mysql-bin.000001,说明已开启。

3. 使用 java 监听 binlog

3.1 选择工具:canal

阿里巴巴开源的 canal 可以模拟 mysql 从库协议,解析 binlog 并实时推送增量数据。

3.2 java 代码监听 binlog

引入 maven 依赖

<dependencies>
    <dependency>
        <groupid>com.alibaba.otter</groupid>
        <artifactid>canal.client</artifactid>
        <version>1.1.6</version>
    </dependency>
</dependencies>

编写 java 代码

import com.alibaba.otter.canal.client.canalconnector;
import com.alibaba.otter.canal.client.canalconnectors;
import com.alibaba.otter.canal.protocol.canalentry;
import com.alibaba.otter.canal.protocol.message;

import java.net.inetsocketaddress;
import java.util.list;

public class binloglistener {
    public static void main(string[] args) {
        // 连接 canal
        canalconnector connector = canalconnectors.newsingleconnector(
                new inetsocketaddress("127.0.0.1", 11111), 
                "example", "canal", "canal");
        

        try {
            connector.connect();
            connector.subscribe(".*\\..*"); // 监听所有库表
            connector.rollback();
    
            while (true) {
                message message = connector.getwithoutack(100); // 获取数据
                long batchid = message.getid();
                list<canalentry.entry> entries = message.getentries();
    
                if (batchid != -1 && !entries.isempty()) {
                    for (canalentry.entry entry : entries) {
                        if (entry.getentrytype() == canalentry.entrytype.rowdata) {
                            processentry(entry);
                        }
                    }
                }
                connector.ack(batchid); // 确认消息
            }
        } finally {
            connector.disconnect();
        }
    }
    
    private static void processentry(canalentry.entry entry) {
        try {
            canalentry.rowchange rowchange = canalentry.rowchange.parsefrom(entry.getstorevalue());
            canalentry.eventtype eventtype = rowchange.geteventtype();
    
            system.out.println("变更表:" + entry.getheader().gettablename());
            system.out.println("变更类型:" + eventtype);
    
            for (canalentry.rowdata rowdata : rowchange.getrowdataslist()) {
                if (eventtype == canalentry.eventtype.delete) {
                    system.out.println("删除数据:" + rowdata.getbeforecolumnslist());
                } else if (eventtype == canalentry.eventtype.insert) {
                    system.out.println("新增数据:" + rowdata.getaftercolumnslist());
                } else {
                    system.out.println("更新前数据:" + rowdata.getbeforecolumnslist());
                    system.out.println("更新后数据:" + rowdata.getaftercolumnslist());
                }
            }
        } catch (exception e) {
            e.printstacktrace();
        }
    }

}

4. 代码解析

  • 创建 canal 连接

canalconnector connector = canalconnectors.newsingleconnector(
    new inetsocketaddress("127.0.0.1", 11111), 
    "example", "canal", "canal");
    • 127.0.0.1:canal 服务器地址
    • 11111:canal 端口
    • example:canal 实例
    • canal/canal:默认账号密码
  • 获取 binlog 变更数据

message message = connector.getwithoutack(100);
    • getwithoutack(100):拉取 100 条 binlog 事件。
  • 解析 binlog

for (canalentry.entry entry : entries) {
    if (entry.getentrytype() == canalentry.entrytype.rowdata) {
        processentry(entry);
    }
}
  • 仅处理 rowdata 类型的变更,忽略事务等其他信息。

  • 分类处理 insertupdatedelete

if (eventtype == canalentry.eventtype.delete) {
    system.out.println("删除数据:" + rowdata.getbeforecolumnslist());
} else if (eventtype == canalentry.eventtype.insert) {
    system.out.println("新增数据:" + rowdata.getaftercolumnslist());
} else {
    system.out.println("更新前数据:" + rowdata.getbeforecolumnslist());
    system.out.println("更新后数据:" + rowdata.getaftercolumnslist());
}

总结

  • mysql binlog 记录数据库变更,可用于监听增量数据。
  • canal 作为 mysql 从库解析 binlog,实现数据同步。
  • java 代码示例 展示如何用 canal 监听 insertupdatedelete 操作,并解析变更数据。

这种方案适用于 分布式数据同步缓存一致性数据变更通知,是实时数据处理的重要手段。

以上就是mysql数据变化监听的实现方案的详细内容,更多关于mysql数据变化监听的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com