说明:canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 mysql 数据库增量日志解析,提供增量数据订阅和消费(官方介绍)。一言以蔽之,canal是一款实现数据同步的组件。可以实现数据库之间、数据库与redis、es之间的数据同步。本文介绍canal的入门使用。
canal介绍
canal实现原理是伪装成mysql主节点的从节点,接收主节点的binlog日志,解析、提取数据库操作,将对数据库的操作通过代码更新到其他组件中,如其他数据库、es、redis等,官方解释如下:
官方提供的结构图如下:
canal安装
首先,从官网上下载canal服务器,地址:https://github.com/alibaba/canal/releases
下载下来,解压,如下:
canal配置文件暂时不用管,先修改一下example
文件中监测的目前节点配置,修改成自己需要监测的mysql配置,如下:
修改完,启动canal服务,双击startup.bat
文件,如下:
canal使用
只要你的mysql服务器的ip、账号密码没输错,且测试过能用navicat或其他数据库连接工具成功连接数据库,那么就可以进行下面的编码工作了。
首先,创建一个maven项目,pom.xml如下,导个canal依赖就行了
<?xml version="1.0" encoding="utf-8"?> <project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelversion>4.0.0</modelversion> <groupid>com.hezy</groupid> <artifactid>canal_demo</artifactid> <version>1.0-snapshot</version> <properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> <project.build.sourceencoding>utf-8</project.build.sourceencoding> </properties> <dependencies> <!--canal客户端--> <dependency> <groupid>top.javatool</groupid> <artifactid>canal-spring-boot-starter</artifactid> <version>1.2.1-release</version> </dependency> </dependencies> </project>
测试代码如下,用来连接canal服务器,打印canal监测到的数据内容;
import com.alibaba.otter.canal.client.canalconnector; import com.alibaba.otter.canal.client.canalconnectors; import com.alibaba.otter.canal.protocol.canalentry; import com.alibaba.otter.canal.protocol.message; import com.google.protobuf.bytestring; import com.google.protobuf.invalidprotocolbufferexception; import java.net.inetsocketaddress; import java.util.hashmap; import java.util.list; /** * canal处理器 * 作用:打印canal服务器监测到的数据 */ public class canalhandler { public static void main(string[] args) throws invalidprotocolbufferexception { // 1.创建连接 canalconnector canalconnector = canalconnectors .newsingleconnector(new inetsocketaddress("localhost", 11111), "example", "", ""); // 2.抓取数据 while (true) { // 3.开始连接 canalconnector.connect(); // 4.订阅数据,所有的库和表 canalconnector.subscribe(".*\\..*"); // 5.抓取数据,每次抓取100条 message message = canalconnector.get(100); // 6.获取entry集合 list<canalentry.entry> entries = message.getentries(); // 7.判断是否有数据 if (entries.size() == 0) { system.out.println(">>>暂无数据<<<"); try { thread.sleep(1000); } catch (interruptedexception e) { e.printstacktrace(); } } else { // 8.解析数据 for (canalentry.entry entry : entries) { // 获取表名 string tablename = entry.getheader().gettablename(); // 获取操作类型 canalentry.entrytype entrytype = entry.getentrytype(); // 判断entrytype是否为rowdata if (canalentry.entrytype.rowdata.equals(entrytype)) { // 序列化数据 bytestring storevalue = entry.getstorevalue(); // 反序列化数据 canalentry.rowchange rowchange = canalentry.rowchange.parsefrom(storevalue); // 获取事件类型 canalentry.eventtype eventtype = rowchange.geteventtype(); // 获取具体的数据 list<canalentry.rowdata> rowdataslist = rowchange.getrowdataslist(); // 遍历打印 for (canalentry.rowdata rowdata : rowdataslist) { // 获取拉取前后的数据 list<canalentry.column> beforecolumnslist = rowdata.getbeforecolumnslist(); list<canalentry.column> aftercolumnslist = rowdata.getaftercolumnslist(); // 用map存储每条数据 hashmap<string, object> beforemap = new hashmap<>(); hashmap<string, object> aftermap = new hashmap<>(); // 获取不同操作的数据 if (canalentry.eventtype.insert.equals(eventtype)) { system.out.println("【" + tablename + "】表插入数据"); for (canalentry.column column : aftercolumnslist) { aftermap.put(column.getname(), column.getvalue()); } system.out.println("新增数据:" + aftermap); } else if (canalentry.eventtype.update.equals(eventtype)) { system.out.println("【" + tablename + "】表更新数据"); for (canalentry.column column : beforecolumnslist) { beforemap.put(column.getname(), column.getvalue()); } system.out.println("更新前:" + beforemap); system.out.println("----"); for (canalentry.column column : aftercolumnslist) { aftermap.put(column.getname(), column.getvalue()); } system.out.println("更新后:" + aftermap); } else if (canalentry.eventtype.delete.equals(eventtype)) { system.out.println("【" + tablename + "】表删除数据"); for (canalentry.column column : beforecolumnslist) { beforemap.put(column.getname(), column.getvalue()); } system.out.println("被删除的数据:" + beforemap); } } } } } } } }
启动程序,查看控制台,检测中……
使用navicat连接数据库,查看数据库test库,i_user表内容;
此时,我们新增一条数据,看控制台,canal成功接收到了这次修改;
更新数据;
删除数据;
头能过身体就能过,接下来不就好办了。将canal接收到的数据转为对象,根据不同的操作类型分发给自己想要同步的组件,同步给从mysql,就调用对应的mapper;同步给redis,就调用redis对应的方法,es同样。
总结
本文介绍了canal入门使用,参考b站视频:canal极简入门:一小时让你快速上手canal数据同步神技~
到此这篇关于canal入门使用小结的文章就介绍到这了,更多相关canal入门内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支
发表评论