1. binlog 简介
1.1 什么是 binlog?
binlog(binary log) 是 mysql 记录 ddl(数据定义语言,如 create、alter)和 dml(数据操作语言,如 insert、update、delete)的日志文件,它用于:
- 主从复制:mysql 主库将 binlog 传输到从库,实现数据同步。
- 数据恢复:通过
mysqlbinlog工具解析 binlog 恢复数据。 - 数据同步:第三方工具(如 canal)解析 binlog,进行数据同步。
1.2 binlog 的三种格式
| binlog 格式 | 说明 |
|---|---|
| statement | 记录 sql 语句本身 |
| row | 记录行数据变更(推荐) |
| mixed | 结合前两者,mysql 自动判断 |
由于 row 格式能提供精确的行级别变更信息,因此推荐使用它。
2. 开启 binlog 并配置 mysql
2.1 检查 binlog 是否开启
show variables like 'log_bin';
如果 log_bin 值为 off,说明 binlog 未开启。
2.2 修改 mysql 配置文件(my.cnf 或 my.ini)
在 [mysqld] 部分添加以下内容:
server-id=1 log-bin=mysql-bin binlog-format=row binlog-row-image=full expire_logs_days=7
重启 mysql:
systemctl restart mysql # linux net stop mysql && net start mysql # windows
2.3 验证 binlog 配置
执行:
show binary logs;
如果有 binlog 文件,如 mysql-bin.000001,说明已开启。
3. 使用 java 监听 binlog
3.1 选择工具:canal
阿里巴巴开源的 canal 可以模拟 mysql 从库协议,解析 binlog 并实时推送增量数据。
3.2 java 代码监听 binlog
引入 maven 依赖
<dependencies>
<dependency>
<groupid>com.alibaba.otter</groupid>
<artifactid>canal.client</artifactid>
<version>1.1.6</version>
</dependency>
</dependencies>
编写 java 代码
import com.alibaba.otter.canal.client.canalconnector;
import com.alibaba.otter.canal.client.canalconnectors;
import com.alibaba.otter.canal.protocol.canalentry;
import com.alibaba.otter.canal.protocol.message;
import java.net.inetsocketaddress;
import java.util.list;
public class binloglistener {
public static void main(string[] args) {
// 连接 canal
canalconnector connector = canalconnectors.newsingleconnector(
new inetsocketaddress("127.0.0.1", 11111),
"example", "canal", "canal");
try {
connector.connect();
connector.subscribe(".*\\..*"); // 监听所有库表
connector.rollback();
while (true) {
message message = connector.getwithoutack(100); // 获取数据
long batchid = message.getid();
list<canalentry.entry> entries = message.getentries();
if (batchid != -1 && !entries.isempty()) {
for (canalentry.entry entry : entries) {
if (entry.getentrytype() == canalentry.entrytype.rowdata) {
processentry(entry);
}
}
}
connector.ack(batchid); // 确认消息
}
} finally {
connector.disconnect();
}
}
private static void processentry(canalentry.entry entry) {
try {
canalentry.rowchange rowchange = canalentry.rowchange.parsefrom(entry.getstorevalue());
canalentry.eventtype eventtype = rowchange.geteventtype();
system.out.println("变更表:" + entry.getheader().gettablename());
system.out.println("变更类型:" + eventtype);
for (canalentry.rowdata rowdata : rowchange.getrowdataslist()) {
if (eventtype == canalentry.eventtype.delete) {
system.out.println("删除数据:" + rowdata.getbeforecolumnslist());
} else if (eventtype == canalentry.eventtype.insert) {
system.out.println("新增数据:" + rowdata.getaftercolumnslist());
} else {
system.out.println("更新前数据:" + rowdata.getbeforecolumnslist());
system.out.println("更新后数据:" + rowdata.getaftercolumnslist());
}
}
} catch (exception e) {
e.printstacktrace();
}
}
}
4. 代码解析
创建 canal 连接
canalconnector connector = canalconnectors.newsingleconnector(
new inetsocketaddress("127.0.0.1", 11111),
"example", "canal", "canal");
127.0.0.1:canal 服务器地址11111:canal 端口example:canal 实例canal/canal:默认账号密码
获取 binlog 变更数据
message message = connector.getwithoutack(100);
getwithoutack(100):拉取 100 条 binlog 事件。
解析 binlog
for (canalentry.entry entry : entries) {
if (entry.getentrytype() == canalentry.entrytype.rowdata) {
processentry(entry);
}
}
仅处理
rowdata类型的变更,忽略事务等其他信息。分类处理
insert、update、delete
if (eventtype == canalentry.eventtype.delete) {
system.out.println("删除数据:" + rowdata.getbeforecolumnslist());
} else if (eventtype == canalentry.eventtype.insert) {
system.out.println("新增数据:" + rowdata.getaftercolumnslist());
} else {
system.out.println("更新前数据:" + rowdata.getbeforecolumnslist());
system.out.println("更新后数据:" + rowdata.getaftercolumnslist());
}
总结
- mysql binlog 记录数据库变更,可用于监听增量数据。
- canal 作为 mysql 从库解析 binlog,实现数据同步。
- java 代码示例 展示如何用 canal 监听
insert、update、delete操作,并解析变更数据。
这种方案适用于 分布式数据同步、缓存一致性 和 数据变更通知,是实时数据处理的重要手段。
以上就是mysql数据变化监听的实现方案的详细内容,更多关于mysql数据变化监听的资料请关注代码网其它相关文章!
发表评论