前言
最近有一个项目,频繁读配置文件,如果用mysql怕压力大,一直用的redis,用redis虽然解决了读的问题,但是修改的话也要单独写配置,同时在其他环境配置也不方便保存,为此找了mysql和redis同步方案canal。
canal 是什么
canal 是阿里巴巴开源的一款基于 mysql 数据库二进制日志(binlog)增量订阅和消费的中间件。它的名字来源于 "canal"(运河、通道),寓意着在数据库和应用之间建立一条数据同步的通道。
工作原理流程: mysql 主库 → binlog(二进制日志) → canal server(伪装成从库) → 解析日志 → 发送数据 → 各种下游系统
技术原理
canal 模拟 mysql slave 的交互协议
向 mysql master 发送 dump 请求
mysql master 收到请求后,开始推送 binlog 给 canal
canal 解析 binlog 对象(原始为 byte 流)
将解析后的数据发送到消息队列或其他下游系统
主要应用场景
| 应用场景 | 具体说明 | 典型案例 |
|---|---|---|
| 数据库镜像 | 实时备份数据库数据 | 主从数据库同步、异地多活 |
| 数据库实时备份 | 增量数据的实时备份 | 灾备系统、数据恢复 |
| 多级索引构建 | 同步数据到搜索引擎 | elasticsearch、solr 索引更新 |
| 业务缓存刷新 | 数据库变更后更新缓存 | redis 缓存一致性维护 |
| 价格变化监控 | 监控特定数据的变化 | 电商价格监控、库存预警 |
| 增量数据订阅 | 为其他系统提供增量数据 | 数据仓库 etl、大数据分析 |
| 异地数据同步 | 跨机房、跨地域数据同步 | 全球化业务数据同步 |
linux系统环境准备
1.宝塔面板mysql修改配置
server-id=1 #master端的id号【必须是唯一的】;
log_bin=mysql-bin #同步的日志路径,一定注意这个目录要是mysql有权限写入的
binlog-format=row #行级,记录每次操作后每行记录的变化。
binlog-do-db=game_mqtt #指定库,缩小监控的范围。


查看是否生效:
show variables like 'log_bin';
查看正在写入的binlog文件状态:
show master status;

2. mysql为canal配置权限
在mysql中给canal单独建一个用户,给全库全表的读,拷贝,复制的权限
账号密码都是:canal
grant select, replication slave, replication client on *.* to 'canal'@'%' identified by 'canal' ;

3.安装canal服务端
安装 canal.deployer-1.1.6.tar.gz
官网下载地址https://github.com/alibaba/canal/releases
备用下载地址:https://download.csdn.net/download/qq_33215204/88369947

将文件上传服务器目录 /data/soft/canal

解压文件
tar -zxvf canal.deployer-1.1.6.tar.gz
4.配置canal
查看 conf/canal.properties 配置,发现需要暴漏三个端口
canal.admin.port = 11110
canal.port = 11111
canal.metrics.pull.port = 11112

conf/canal.properties 配置
# 指定实例,多个实例使用逗号分隔: canal.destinations = example1,example2
canal.destinations = example
修改 conf/example/instance.properties 实例配置
# 配置 slaveid 自定义,不等于 mysql 的 server id 即可 canal.instance.mysql.slaveid=10 # 数据库地址:自己的数据库ip+端口 canal.instance.master.address=127.0.0.1:3306 # 数据库用户名和密码 canal.instance.dbusername=canal canal.instance.dbpassword=canal #代表数据库的编码方式对应到 java 中的编码类型,比如 utf-8,gbk , iso-8859-1 canal.instance.connectioncharset = utf-8 # 指定库和表,这里的 .* 表示 canal.instance.master.address 下面的所有数据库 canal.instance.filter.regex=.*\\..*
配置完成之后启动
5.启动canal
cd /data/soft/canal/bin
./startup.sh


启动完成之后去看下有没有日志信息。


2023-09-24 12:14:24.280 [main] info c.a.otter.canal.instance.spring.canalinstancewithspring - start cannalinstance for 1-example 2023-09-24 12:14:24.309 [main] warn c.a.o.canal.parse.inbound.mysql.dbsync.logeventconvert - --> init table filter : ^.*\..*$ 2023-09-24 12:14:24.309 [main] warn c.a.o.canal.parse.inbound.mysql.dbsync.logeventconvert - --> init table black filter : ^mysql\.slave_.*$ 2023-09-24 12:14:24.320 [main] info c.a.otter.canal.instance.core.abstractcanalinstance - start successful.... 2023-09-24 12:14:24.480 [destination = example , address = /127.0.0.1:3306 , eventparser] warn c.a.o.c.p.inbound.mysql.rds.rdsbinlogeventparserproxy - ---> begin to find start position, it will be long time for reset or first position 2023-09-24 12:14:24.480 [destination = example , address = /127.0.0.1:3306 , eventparser] warn c.a.o.c.p.inbound.mysql.rds.rdsbinlogeventparserproxy - prepare to find start position just show master status 2023-09-24 12:14:26.135 [destination = example , address = /127.0.0.1:3306 , eventparser] warn c.a.o.c.p.inbound.mysql.rds.rdsbinlogeventparserproxy - ---> find start position successfully, entryposition[included=false,journalname=mysql-bin.000045,position=4,serverid=1,gtid=<null>,timestamp=1695526635000] cost : 1634ms , the next step is binlog dump
启动成功,接下来就是在springboot中创建客户端监控了。
canal客户端监控mysql信息,实现业务逻辑
1.配置pom.xml
<dependency>
<groupid>com.alibaba.otter</groupid>
<artifactid>canal.client</artifactid>
<version>1.1.0</version>
</dependency>

2.创建demo
package com.game.service;
import com.alibaba.otter.canal.client.canalconnector;
import com.alibaba.otter.canal.client.canalconnectors;
import com.alibaba.otter.canal.protocol.canalentry;
import com.alibaba.otter.canal.protocol.message;
import java.net.inetsocketaddress;
import java.util.list;
public class canalclient {
public static void main(string args[]) {
// 创建链接:换成自己的数据库ip地址
canalconnector connector = canalconnectors.newsingleconnector(new inetsocketaddress("192.168.0.243",
11111), "example", "", "");
int batchsize = 1000;
int emptycount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalemptycount = 120;
while (emptycount < totalemptycount) {
message message = connector.getwithoutack(batchsize); // 获取指定数量的数据
long batchid = message.getid();
int size = message.getentries().size();
if (batchid == -1 || size == 0) {
emptycount++;
system.out.println("empty count : " + emptycount);
try {
thread.sleep(1000);
} catch (interruptedexception e) {
}
} else {
emptycount = 0;
printentry(message.getentries());
}
connector.ack(batchid); // 提交确认
}
system.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printentry(list<canalentry.entry> entrys) {
for (canalentry.entry entry : entrys) {
if (entry.getentrytype() == canalentry.entrytype.transactionbegin || entry.getentrytype() == canalentry.entrytype.transactionend) {
continue;
}
canalentry.rowchange rowchage = null;
try {
rowchage = canalentry.rowchange.parsefrom(entry.getstorevalue());
} catch (exception e) {
throw new runtimeexception("error ## parser of eromanga-event has an error , data:" + entry.tostring(),
e);
}
canalentry.eventtype eventtype = rowchage.geteventtype();
system.out.println(string.format("================> binlog[%s:%s] , name[%s,%s] , eventtype : %s",
entry.getheader().getlogfilename(), entry.getheader().getlogfileoffset(),
entry.getheader().getschemaname(), entry.getheader().gettablename(),
eventtype));
for (canalentry.rowdata rowdata : rowchage.getrowdataslist()) {
if (eventtype == canalentry.eventtype.delete) {
printcolumn(rowdata.getbeforecolumnslist());
} else if (eventtype == canalentry.eventtype.insert) {
printcolumn(rowdata.getaftercolumnslist());
} else {
system.out.println("-------> before");
printcolumn(rowdata.getbeforecolumnslist());
system.out.println("-------> after");
printcolumn(rowdata.getaftercolumnslist());
}
}
}
}
private static void printcolumn(list<canalentry.column> columns) {
for (canalentry.column column : columns) {
system.out.println(column.getname() + " : " + column.getvalue() + " update=" + column.getupdated());
}
}
}
3.测试删除一条记录


后面的就是监控数据同步redis,就不写了
到此这篇关于linux宝塔面板使用canal实现mysql和redis数据同步(图文教程)的文章就介绍到这了,更多相关linux使用canal实现mysql和redis同步内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论