当前位置: 代码网 > it编程>数据库>Mysql > Linux宝塔面板使用Canal实现Mysql和Redis数据同步(图文教程)

Linux宝塔面板使用Canal实现Mysql和Redis数据同步(图文教程)

2025年12月21日 Mysql 我要评论
前言最近有一个项目,频繁读配置文件,如果用mysql怕压力大,一直用的redis,用redis虽然解决了读的问题,但是修改的话也要单独写配置,同时在其他环境配置也不方便保存,为此找了mysql和red

前言

最近有一个项目,频繁读配置文件,如果用mysql怕压力大,一直用的redis,用redis虽然解决了读的问题,但是修改的话也要单独写配置,同时在其他环境配置也不方便保存,为此找了mysql和redis同步方案canal。

canal 是什么

canal 是阿里巴巴开源的一款基于 mysql 数据库二进制日志(binlog)增量订阅和消费的中间件。它的名字来源于 "canal"(运河、通道),寓意着在数据库和应用之间建立一条数据同步的通道。

工作原理流程:
mysql 主库 → binlog(二进制日志) → canal server(伪装成从库) → 解析日志 → 发送数据 → 各种下游系统

技术原理

  • canal 模拟 mysql slave 的交互协议

  • 向 mysql master 发送 dump 请求

  • mysql master 收到请求后,开始推送 binlog 给 canal

  • canal 解析 binlog 对象(原始为 byte 流)

  • 将解析后的数据发送到消息队列或其他下游系统

主要应用场景

应用场景具体说明典型案例
数据库镜像实时备份数据库数据主从数据库同步、异地多活
数据库实时备份增量数据的实时备份灾备系统、数据恢复
多级索引构建同步数据到搜索引擎elasticsearch、solr 索引更新
业务缓存刷新数据库变更后更新缓存redis 缓存一致性维护
价格变化监控监控特定数据的变化电商价格监控、库存预警
增量数据订阅为其他系统提供增量数据数据仓库 etl、大数据分析
异地数据同步跨机房、跨地域数据同步全球化业务数据同步

linux系统环境准备

1.宝塔面板mysql修改配置

server-id=1    #master端的id号【必须是唯一的】;
log_bin=mysql-bin    #同步的日志路径,一定注意这个目录要是mysql有权限写入的
binlog-format=row    #行级,记录每次操作后每行记录的变化。
binlog-do-db=game_mqtt  #指定库,缩小监控的范围。

查看是否生效:

show variables like 'log_bin';

 查看正在写入的binlog文件状态:

show master status;

 

2. mysql为canal配置权限

在mysql中给canal单独建一个用户,给全库全表的读,拷贝,复制的权限

账号密码都是:canal

grant select, replication slave, replication client on *.* to 'canal'@'%' identified by 'canal' ;

3.安装canal服务端

 安装 canal.deployer-1.1.6.tar.gz

官网下载地址https://github.com/alibaba/canal/releases

备用下载地址:https://download.csdn.net/download/qq_33215204/88369947

 将文件上传服务器目录   /data/soft/canal

解压文件

tar -zxvf canal.deployer-1.1.6.tar.gz 

4.配置canal

查看 conf/canal.properties 配置,发现需要暴漏三个端口

 canal.admin.port = 11110
 canal.port = 11111
 canal.metrics.pull.port = 11112

conf/canal.properties 配置

# 指定实例,多个实例使用逗号分隔: canal.destinations = example1,example2
canal.destinations = example
 

修改 conf/example/instance.properties 实例配置 

# 配置 slaveid 自定义,不等于 mysql 的 server id 即可
canal.instance.mysql.slaveid=10 

# 数据库地址:自己的数据库ip+端口
canal.instance.master.address=127.0.0.1:3306 
 
# 数据库用户名和密码 
canal.instance.dbusername=canal
canal.instance.dbpassword=canal

#代表数据库的编码方式对应到 java 中的编码类型,比如 utf-8,gbk , iso-8859-1
canal.instance.connectioncharset = utf-8
	
# 指定库和表,这里的 .* 表示 canal.instance.master.address 下面的所有数据库
canal.instance.filter.regex=.*\\..*

 配置完成之后启动

5.启动canal

cd /data/soft/canal/bin

./startup.sh

启动完成之后去看下有没有日志信息。

2023-09-24 12:14:24.280 [main] info  c.a.otter.canal.instance.spring.canalinstancewithspring - start cannalinstance for 1-example 
2023-09-24 12:14:24.309 [main] warn  c.a.o.canal.parse.inbound.mysql.dbsync.logeventconvert - --> init table filter : ^.*\..*$
2023-09-24 12:14:24.309 [main] warn  c.a.o.canal.parse.inbound.mysql.dbsync.logeventconvert - --> init table black filter : ^mysql\.slave_.*$
2023-09-24 12:14:24.320 [main] info  c.a.otter.canal.instance.core.abstractcanalinstance - start successful....
2023-09-24 12:14:24.480 [destination = example , address = /127.0.0.1:3306 , eventparser] warn  c.a.o.c.p.inbound.mysql.rds.rdsbinlogeventparserproxy - ---> begin to find start position, it will be long time for reset or first position
2023-09-24 12:14:24.480 [destination = example , address = /127.0.0.1:3306 , eventparser] warn  c.a.o.c.p.inbound.mysql.rds.rdsbinlogeventparserproxy - prepare to find start position just show master status
2023-09-24 12:14:26.135 [destination = example , address = /127.0.0.1:3306 , eventparser] warn  c.a.o.c.p.inbound.mysql.rds.rdsbinlogeventparserproxy - ---> find start position successfully, entryposition[included=false,journalname=mysql-bin.000045,position=4,serverid=1,gtid=<null>,timestamp=1695526635000] cost : 1634ms , the next step is binlog dump

 启动成功,接下来就是在springboot中创建客户端监控了。

canal客户端监控mysql信息,实现业务逻辑

1.配置pom.xml

<dependency>
    <groupid>com.alibaba.otter</groupid>
    <artifactid>canal.client</artifactid>
    <version>1.1.0</version>
</dependency>
 

2.创建demo

package com.game.service;

import com.alibaba.otter.canal.client.canalconnector;
import com.alibaba.otter.canal.client.canalconnectors;
import com.alibaba.otter.canal.protocol.canalentry;
import com.alibaba.otter.canal.protocol.message;

import java.net.inetsocketaddress;
import java.util.list;

public class canalclient {
    public static void main(string args[]) {
        // 创建链接:换成自己的数据库ip地址
        canalconnector connector = canalconnectors.newsingleconnector(new inetsocketaddress("192.168.0.243",
                11111), "example", "", "");
        int batchsize = 1000;
        int emptycount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalemptycount = 120;
            while (emptycount < totalemptycount) {
                message message = connector.getwithoutack(batchsize); // 获取指定数量的数据
                long batchid = message.getid();
                int size = message.getentries().size();
                if (batchid == -1 || size == 0) {
                    emptycount++;
                    system.out.println("empty count : " + emptycount);
                    try {
                        thread.sleep(1000);
                    } catch (interruptedexception e) {
                    }
                } else {
                    emptycount = 0;
                    printentry(message.getentries());
                }

                connector.ack(batchid); // 提交确认
            }

            system.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printentry(list<canalentry.entry> entrys) {
        for (canalentry.entry entry : entrys) {
            if (entry.getentrytype() == canalentry.entrytype.transactionbegin || entry.getentrytype() == canalentry.entrytype.transactionend) {
                continue;
            }

            canalentry.rowchange rowchage = null;
            try {
                rowchage = canalentry.rowchange.parsefrom(entry.getstorevalue());
            } catch (exception e) {
                throw new runtimeexception("error ## parser of eromanga-event has an error , data:" + entry.tostring(),
                        e);
            }

            canalentry.eventtype eventtype = rowchage.geteventtype();
            system.out.println(string.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventtype : %s",
                    entry.getheader().getlogfilename(), entry.getheader().getlogfileoffset(),
                    entry.getheader().getschemaname(), entry.getheader().gettablename(),
                    eventtype));

            for (canalentry.rowdata rowdata : rowchage.getrowdataslist()) {
                if (eventtype == canalentry.eventtype.delete) {
                    printcolumn(rowdata.getbeforecolumnslist());
                } else if (eventtype == canalentry.eventtype.insert) {
                    printcolumn(rowdata.getaftercolumnslist());
                } else {
                    system.out.println("-------&gt; before");
                    printcolumn(rowdata.getbeforecolumnslist());
                    system.out.println("-------&gt; after");
                    printcolumn(rowdata.getaftercolumnslist());
                }
            }
        }
    }

    private static void printcolumn(list<canalentry.column> columns) {
        for (canalentry.column column : columns) {
            system.out.println(column.getname() + " : " + column.getvalue() + "    update=" + column.getupdated());
        }
    }
}


3.测试删除一条记录

后面的就是监控数据同步redis,就不写了 

到此这篇关于linux宝塔面板使用canal实现mysql和redis数据同步(图文教程)的文章就介绍到这了,更多相关linux使用canal实现mysql和redis同步内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com