1. binlog 简介
1.1 什么是 binlog?
binlog(binary log) 是 mysql 记录 ddl(数据定义语言,如 create
、alter
)和 dml(数据操作语言,如 insert
、update
、delete
)的日志文件,它用于:
- 主从复制:mysql 主库将 binlog 传输到从库,实现数据同步。
- 数据恢复:通过
mysqlbinlog
工具解析 binlog 恢复数据。 - 数据同步:第三方工具(如 canal)解析 binlog,进行数据同步。
1.2 binlog 的三种格式
binlog 格式 | 说明 |
---|---|
statement | 记录 sql 语句本身 |
row | 记录行数据变更(推荐) |
mixed | 结合前两者,mysql 自动判断 |
由于 row 格式能提供精确的行级别变更信息,因此推荐使用它。
2. 开启 binlog 并配置 mysql
2.1 检查 binlog 是否开启
show variables like 'log_bin';
如果 log_bin
值为 off
,说明 binlog 未开启。
2.2 修改 mysql 配置文件(my.cnf 或 my.ini)
在 [mysqld]
部分添加以下内容:
server-id=1 log-bin=mysql-bin binlog-format=row binlog-row-image=full expire_logs_days=7
重启 mysql:
systemctl restart mysql # linux net stop mysql && net start mysql # windows
2.3 验证 binlog 配置
执行:
show binary logs;
如果有 binlog 文件,如 mysql-bin.000001
,说明已开启。
3. 使用 java 监听 binlog
3.1 选择工具:canal
阿里巴巴开源的 canal 可以模拟 mysql 从库协议,解析 binlog 并实时推送增量数据。
3.2 java 代码监听 binlog
引入 maven 依赖
<dependencies> <dependency> <groupid>com.alibaba.otter</groupid> <artifactid>canal.client</artifactid> <version>1.1.6</version> </dependency> </dependencies>
编写 java 代码
import com.alibaba.otter.canal.client.canalconnector; import com.alibaba.otter.canal.client.canalconnectors; import com.alibaba.otter.canal.protocol.canalentry; import com.alibaba.otter.canal.protocol.message; import java.net.inetsocketaddress; import java.util.list; public class binloglistener { public static void main(string[] args) { // 连接 canal canalconnector connector = canalconnectors.newsingleconnector( new inetsocketaddress("127.0.0.1", 11111), "example", "canal", "canal"); try { connector.connect(); connector.subscribe(".*\\..*"); // 监听所有库表 connector.rollback(); while (true) { message message = connector.getwithoutack(100); // 获取数据 long batchid = message.getid(); list<canalentry.entry> entries = message.getentries(); if (batchid != -1 && !entries.isempty()) { for (canalentry.entry entry : entries) { if (entry.getentrytype() == canalentry.entrytype.rowdata) { processentry(entry); } } } connector.ack(batchid); // 确认消息 } } finally { connector.disconnect(); } } private static void processentry(canalentry.entry entry) { try { canalentry.rowchange rowchange = canalentry.rowchange.parsefrom(entry.getstorevalue()); canalentry.eventtype eventtype = rowchange.geteventtype(); system.out.println("变更表:" + entry.getheader().gettablename()); system.out.println("变更类型:" + eventtype); for (canalentry.rowdata rowdata : rowchange.getrowdataslist()) { if (eventtype == canalentry.eventtype.delete) { system.out.println("删除数据:" + rowdata.getbeforecolumnslist()); } else if (eventtype == canalentry.eventtype.insert) { system.out.println("新增数据:" + rowdata.getaftercolumnslist()); } else { system.out.println("更新前数据:" + rowdata.getbeforecolumnslist()); system.out.println("更新后数据:" + rowdata.getaftercolumnslist()); } } } catch (exception e) { e.printstacktrace(); } } }
4. 代码解析
创建 canal 连接
canalconnector connector = canalconnectors.newsingleconnector( new inetsocketaddress("127.0.0.1", 11111), "example", "canal", "canal");
127.0.0.1
:canal 服务器地址11111
:canal 端口example
:canal 实例canal/canal
:默认账号密码
获取 binlog 变更数据
message message = connector.getwithoutack(100);
getwithoutack(100)
:拉取 100 条 binlog 事件。
解析 binlog
for (canalentry.entry entry : entries) { if (entry.getentrytype() == canalentry.entrytype.rowdata) { processentry(entry); } }
仅处理
rowdata
类型的变更,忽略事务等其他信息。分类处理
insert
、update
、delete
if (eventtype == canalentry.eventtype.delete) { system.out.println("删除数据:" + rowdata.getbeforecolumnslist()); } else if (eventtype == canalentry.eventtype.insert) { system.out.println("新增数据:" + rowdata.getaftercolumnslist()); } else { system.out.println("更新前数据:" + rowdata.getbeforecolumnslist()); system.out.println("更新后数据:" + rowdata.getaftercolumnslist()); }
总结
- mysql binlog 记录数据库变更,可用于监听增量数据。
- canal 作为 mysql 从库解析 binlog,实现数据同步。
- java 代码示例 展示如何用 canal 监听
insert
、update
、delete
操作,并解析变更数据。
这种方案适用于 分布式数据同步、缓存一致性 和 数据变更通知,是实时数据处理的重要手段。
以上就是mysql数据变化监听的实现方案的详细内容,更多关于mysql数据变化监听的资料请关注代码网其它相关文章!
发表评论