当前位置: 代码网 > it编程>数据库>Redis > Canal入门使用小结

Canal入门使用小结

2025年02月08日 Redis 我要评论
说明:canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 mysql 数据库增量日志解析,提供增量数据订阅和消费(官方介绍)。一言以蔽之,canal是一款实现

说明:canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 mysql 数据库增量日志解析,提供增量数据订阅和消费(官方介绍)。一言以蔽之,canal是一款实现数据同步的组件。可以实现数据库之间、数据库与redis、es之间的数据同步。本文介绍canal的入门使用。

canal介绍

canal实现原理是伪装成mysql主节点的从节点,接收主节点的binlog日志,解析、提取数据库操作,将对数据库的操作通过代码更新到其他组件中,如其他数据库、es、redis等,官方解释如下:

在这里插入图片描述

官方提供的结构图如下:

在这里插入图片描述

canal安装

首先,从官网上下载canal服务器,地址:https://github.com/alibaba/canal/releases

在这里插入图片描述

下载下来,解压,如下:

在这里插入图片描述

canal配置文件暂时不用管,先修改一下example文件中监测的目前节点配置,修改成自己需要监测的mysql配置,如下:

在这里插入图片描述

修改完,启动canal服务,双击startup.bat文件,如下:

在这里插入图片描述

canal使用

只要你的mysql服务器的ip、账号密码没输错,且测试过能用navicat或其他数据库连接工具成功连接数据库,那么就可以进行下面的编码工作了。

首先,创建一个maven项目,pom.xml如下,导个canal依赖就行了

<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
         xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelversion>4.0.0</modelversion>

    <groupid>com.hezy</groupid>
    <artifactid>canal_demo</artifactid>
    <version>1.0-snapshot</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceencoding>utf-8</project.build.sourceencoding>
    </properties>

    <dependencies>
        <!--canal客户端-->
        <dependency>
            <groupid>top.javatool</groupid>
            <artifactid>canal-spring-boot-starter</artifactid>
            <version>1.2.1-release</version>
        </dependency>
    </dependencies>

</project>

测试代码如下,用来连接canal服务器,打印canal监测到的数据内容;

import com.alibaba.otter.canal.client.canalconnector;
import com.alibaba.otter.canal.client.canalconnectors;
import com.alibaba.otter.canal.protocol.canalentry;
import com.alibaba.otter.canal.protocol.message;
import com.google.protobuf.bytestring;
import com.google.protobuf.invalidprotocolbufferexception;

import java.net.inetsocketaddress;
import java.util.hashmap;
import java.util.list;

/**
 * canal处理器
 * 作用:打印canal服务器监测到的数据
 */
public class canalhandler {
    public static void main(string[] args) throws invalidprotocolbufferexception {

        // 1.创建连接
        canalconnector canalconnector = canalconnectors
                .newsingleconnector(new inetsocketaddress("localhost", 11111), "example", "", "");

        // 2.抓取数据
        while (true) {

            // 3.开始连接
            canalconnector.connect();

            // 4.订阅数据,所有的库和表
            canalconnector.subscribe(".*\\..*");

            // 5.抓取数据,每次抓取100条
            message message = canalconnector.get(100);

            // 6.获取entry集合
            list<canalentry.entry> entries = message.getentries();

            // 7.判断是否有数据
            if (entries.size() == 0) {
                system.out.println(">>>暂无数据<<<");
                try {
                    thread.sleep(1000);
                } catch (interruptedexception e) {
                    e.printstacktrace();
                }
            } else {
                // 8.解析数据
                for (canalentry.entry entry : entries) {
                    // 获取表名
                    string tablename = entry.getheader().gettablename();
                    // 获取操作类型
                    canalentry.entrytype entrytype = entry.getentrytype();

                    // 判断entrytype是否为rowdata
                    if (canalentry.entrytype.rowdata.equals(entrytype)) {
                        // 序列化数据
                        bytestring storevalue = entry.getstorevalue();
                        // 反序列化数据
                        canalentry.rowchange rowchange = canalentry.rowchange.parsefrom(storevalue);

                        // 获取事件类型
                        canalentry.eventtype eventtype = rowchange.geteventtype();
                        // 获取具体的数据
                        list<canalentry.rowdata> rowdataslist = rowchange.getrowdataslist();
                        // 遍历打印
                        for (canalentry.rowdata rowdata : rowdataslist) {
                            // 获取拉取前后的数据
                            list<canalentry.column> beforecolumnslist = rowdata.getbeforecolumnslist();
                            list<canalentry.column> aftercolumnslist = rowdata.getaftercolumnslist();

                            // 用map存储每条数据
                            hashmap<string, object> beforemap = new hashmap<>();
                            hashmap<string, object> aftermap = new hashmap<>();

                            // 获取不同操作的数据
                            if (canalentry.eventtype.insert.equals(eventtype)) {
                                system.out.println("【" + tablename + "】表插入数据");
                                for (canalentry.column column : aftercolumnslist) {
                                    aftermap.put(column.getname(), column.getvalue());
                                }
                                system.out.println("新增数据:" + aftermap);
                            } else if (canalentry.eventtype.update.equals(eventtype)) {
                                system.out.println("【" + tablename + "】表更新数据");
                                for (canalentry.column column : beforecolumnslist) {
                                    beforemap.put(column.getname(), column.getvalue());
                                }
                                system.out.println("更新前:" + beforemap);
                                system.out.println("----");
                                for (canalentry.column column : aftercolumnslist) {
                                    aftermap.put(column.getname(), column.getvalue());
                                }
                                system.out.println("更新后:" + aftermap);
                            } else if (canalentry.eventtype.delete.equals(eventtype)) {
                                system.out.println("【" + tablename + "】表删除数据");
                                for (canalentry.column column : beforecolumnslist) {
                                    beforemap.put(column.getname(), column.getvalue());
                                }
                                system.out.println("被删除的数据:" + beforemap);
                            }
                        }
                    }
                }
            }
        }
    }
}

启动程序,查看控制台,检测中……

在这里插入图片描述

使用navicat连接数据库,查看数据库test库,i_user表内容;

在这里插入图片描述

此时,我们新增一条数据,看控制台,canal成功接收到了这次修改;

在这里插入图片描述

更新数据;

在这里插入图片描述

删除数据;

在这里插入图片描述

头能过身体就能过,接下来不就好办了。将canal接收到的数据转为对象,根据不同的操作类型分发给自己想要同步的组件,同步给从mysql,就调用对应的mapper;同步给redis,就调用redis对应的方法,es同样。

总结

本文介绍了canal入门使用,参考b站视频:canal极简入门:一小时让你快速上手canal数据同步神技~

到此这篇关于canal入门使用小结的文章就介绍到这了,更多相关canal入门内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支

(0)

相关文章:

  • redis-cli命令行工具的使用小结

    redis-cli命令行工具的使用小结

    redis-cli命令行工具是一个功能强大的redis客户端,它允许用户与redis数据库进行交互和管理。以下是一些常用参数的使用说明:基本连接参数-h, --... [阅读全文]
  • 深入理解Redis大key的危害及解决方案

    一、背景redis作为后端开发中的一个常用组件,在开发过程中承担着非常重要的作用。在其实际使用过程中,我们常常会面临一些技术挑战,其中常见的问题就包括大key问题。当某些数据量较大…

    2025年01月19日 数据库
  • Redis使用SETNX命令实现分布式锁

    Redis使用SETNX命令实现分布式锁

    什么是分布式锁分布式锁是一种用于在分布式系统中控制多个节点对共享资源进行访问的机制。在分布式系统中,由于多个节点可能同时访问和修改同一个资源,因此需要一种方法来... [阅读全文]
  • Redis主从复制的原理分析

    Redis主从复制的原理分析

    redis主从复制的原理主从复制概述在现代分布式系统中,redis作为一款高性能的内存数据库,其主从复制功能是确保数据高可用性和扩展性的关键技术之一。通过主从复... [阅读全文]
  • Redis缓存异常之缓存雪崩问题解读

    缓存异常:缓存雪崩、击穿、穿透当发生缓存雪崩或击穿时,数据库中还是保存了应用要访问的数据。缓存击穿,缓存更数据库中都没有应用要访问的数据。1.缓存雪崩1.1了解缓存雪崩是指大量的应…

    2025年01月16日 数据库
  • Redis哨兵机制的使用详解

    一.哨兵机制基本解读主库发生故障了,如何不间断的服务?哨兵模式:有效的解决主从库自动切换的关键机制在redis中如果从库发生故障了,客户端可以继续向主库和其他从库发消息,进行相关操…

    2025年01月16日 数据库

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com