当前位置: 代码网 > it编程>数据库>Mysql > 使用Canal监听MySQL Binlog日志的实现方案

使用Canal监听MySQL Binlog日志的实现方案

2024年12月18日 Mysql 我要评论
引入原始实现首先看下面的代码,这段代码的作用是关闭超时未支付的订单,包括两个步骤1、将订单状态修改为取消状态2、调用远程服务,恢复数据库和缓存中的库存@override@transactional(r

引入

原始实现

首先看下面的代码,这段代码的作用是关闭超时未支付的订单,包括两个步骤

1、将订单状态修改为取消状态

2、调用远程服务,恢复数据库和缓存中的库存

@override
@transactional(rollbackfor = throwable.class)
public void closeorder(string ordersn) {
    orderdo orderdo = basemapper.selectbyordersn(ordersn);
    if (orderdo.getorderstatus().equals(orderstatusconstant.un_paid)) {
        // --if-- 到时间了,订单还没有支付,取消该订单
        // 修改订单状态为取消状态
        orderdo.setorderstatus(orderstatusconstant.cancel);
        // 分片键不能更新
        orderdo.setvenueid(null);
        basemapper.updatebyordersn(orderdo);
        
        // 还原数据库和缓存中的库存
        result<orderdo> result = null;
        try {
            result = timeperiodfeignservice.release(timeperiodstockrestorereqdto.builder()
                    .timeperiodid(orderdo.gettimeperiodid())
                    .partitionid(orderdo.getpartitionid())
                    .courtindex(orderdo.getcourtindex())
                    .userid(orderdo.getuserid())
                    .build());
        } catch (exception e) {
            // --if-- 库存恢复远程接口调用失败
            throw new serviceexception(baseerrorcode.remote_error);
        }
        if (result != null && !result.issuccess()) {
            // 因为使用了transactional,如果这里出现了异常,订单的关闭修改会回退
            throw new serviceexception("调用远程服务释放时间段数据库库存失败", baseerrorcode.service_error);
        }
    }
}

存在问题

为了确保这两个步骤要么全部成功,要么全部失败,在这段代码中,使用了@transactional注解来管理本地数据库事务。如果说调用远程服务恢复库存时,调用失败,事务会进行回滚,即订单状态还是保持原样,不会被取消。然而,在分布式环境中,当涉及到调用远程服务时,@transactional只能保证本地事务的一致性,而不能保证跨服务的一致性。例如在极端情况下会出现如下问题:

  • 远程服务实际上已经成功处理了请求,完成了库存的恢复。
  • 但由于网络延迟或中断,本地服务未能接收到远程服务的成功响应。
  • 结果是本地服务认为库存恢复失败,触发了本地事务的回滚,使订单状态回到未取消的状态。

这种情况下,就会产生事务不一致的问题:库存已经被正确地恢复,但订单仍然处于可支付状态。这可能导致客户继续尝试支付一个实际上应该被取消的订单,或者导致库存数据与订单状态之间的不匹配。

替代方案

开启 mysql 的 binlog 日志,通过 canal 监听订单状态的变化并异步发送消息至消息队列。消费者从队列中接收消息后,如果检测到订单的状态是从未支付修改为已取消,就负责调用库存服务恢复商品库存。

这种方式解耦了订单服务与库存服务,提高了系统的容错性和处理效率,支持异步操作和流量削峰,确保了最终一致性,并通过幂等性设计保障了数据的准确性和系统的稳定性。

**为什么说确保了最终一致性?**当订单关闭之后,消息队列会保证消息至少被成功消费一次,即库存如果还原失败,消息队列会多次重发消息,如果达到重发上限可以接入人工来处理死信队列的消息

操作

mysql 开启 binlog

log-bin=mysql-bin # 开启 binlog
binlog-format=row # 选择 row 模式
server_id=1 # 配置 mysql replaction 需要定义,不要和 canal 的 slaveid 重复

在这里插入图片描述

通过 cmd 命令行窗口重启 mysql 数据库,让配置生效

c:\windows\system32>net stop mysql8
mysql8 服务正在停止..
mysql8 服务已成功停止。


c:\windows\system32>net start mysql8
mysql8 服务正在启动 .
mysql8 服务已经启动成功。

连接进入mysql之后,使用show variables like 'log_%';查看binlog启动是否成功,如果查询出来log_bin对应的值为on,说明启动成功

c:\windows\system32>mysql -u root -p12345678
mysql: [warning] using a password on the command line interface can be insecure.
welcome to the mysql monitor.  commands end with ; or \g.
your mysql connection id is 9
server version: 8.0.27 mysql community server - gpl

copyright (c) 2000, 2021, oracle and/or its affiliates.

oracle is a registered trademark of oracle corporation and/or its
affiliates. other names may be trademarks of their respective
owners.

type 'help;' or '\h' for help. type '\c' to clear the current input statement.

mysql> show variables like 'log_%';
+----------------------------------------+----------------------------------------------------------------------------------+
| variable_name                          | value                                                                            |
+----------------------------------------+----------------------------------------------------------------------------------+
| log_bin                                | on                                                                               |
| log_bin_basename                       | d:\development\sql\mysql\mysql8\exe\mysql-8.0.27-winx64\data\mysql-bin           |
| log_bin_index                          | d:\development\sql\mysql\mysql8\exe\mysql-8.0.27-winx64\data\mysql-bin.index     |
| log_bin_trust_function_creators        | off                                                                              |
| log_bin_use_v1_row_events              | off                                                                              |
| log_error                              | d:\development\sql\mysql\mysql8\exe\mysql-8.0.27-winx64\data\desktop-tqse9jo.err |
| log_error_services                     | log_filter_internal; log_sink_internal                                           |
| log_error_suppression_list             |                                                                                  |
| log_error_verbosity                    | 2                                                                                |
| log_output                             | file                                                                             |
| log_queries_not_using_indexes          | off                                                                              |
| log_raw                                | off                                                                              |
| log_replica_updates                    | on                                                                               |
| log_slave_updates                      | on                                                                               |
| log_slow_admin_statements              | off                                                                              |
| log_slow_extra                         | off                                                                              |
| log_slow_replica_statements            | off                                                                              |
| log_slow_slave_statements              | off                                                                              |
| log_statements_unsafe_for_binlog       | on                                                                               |
| log_throttle_queries_not_using_indexes | 0                                                                                |
| log_timestamps                         | utc                                                                              |
+----------------------------------------+----------------------------------------------------------------------------------+
21 rows in set, 1 warning (0.01 sec)

在这里插入图片描述

给canal创建一个单独使用的账号来进行 binlog 的同步和监听

create user canal identified by 'canal';  
grant select, replication slave, replication client on *.* to 'canal'@'%';
flush privileges;

执行成功,mysql的user表就多了一天canal的记录

mysql> create user canal identified by 'canal';  grant select, replication slave, replication client on *.* to 'canal'@'%';flush privileges;
query ok, 0 rows affected (0.01 sec)

query ok, 0 rows affected (0.00 sec)

query ok, 0 rows affected (0.00 sec)

在这里插入图片描述

canal 中间件

下载方式

源码地址:https://github.com/alibaba/canal

下载地址:https://github.com/alibaba/canal/releases

在这里插入图片描述

解压之后,目录如下:

在这里插入图片描述

修改配置文件

instance.properties

首先修改配置文件instance.properties

  • 修改canal.instance.master.address,指向真正的 mysql 的 ip 和端口
  • 修改canal.mq.topic,声明发送到消息队列的消息的 topic

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

如果你给canal提供的账号密码不是canal,需要修改

在这里插入图片描述

最后,并不是所有数据库或数据表改动我们都需要做出反应的,这里我只针对我需要监听的数据库和数据表即可,通过设置canal.instance.filter.regex进行过滤

我的设置为canal.instance.filter.regex=^(venue-reservation)\\.(time_period_order_([0-9]|1[0-9]|2[0-9]|3[0-1]))$,解释如下:

  • ^ 表示字符串的开始。
  • (venue-reservation) 匹配名为 venue-reservation 的数据库。这里使用了括号 () 来创建一个捕获组。如果说项目使用分库,需要匹配多个数据库的话,可以这样写(venue-reservation_0|venue-reservation_1|venue-reservation_2)
  • \\. 匹配实际的点号,这是数据库名和表名之间的分隔符。
  • (time_period_order_([0-9]|1[0-9]|2[0-9]|3[0-1])) 匹配特定模式的表名。这里也是用括号创建了一个捕获组。因为项目对time_period_order进行了分表,所以需要这样设置。具体地:
    • time_period_order_ 匹配固定前缀 time_period_order_
    • ([0-9]|1[0-9]|2[0-9]|3[0-1]) 这一部分是用来匹配数字部分,看起来像是为了匹配类似于时间间隔或者编号的表。更具体地说:
      • [0-9] 匹配从0到9的任何数字。
      • 1[0-9] 匹配从10到19的两位数。
      • 2[0-9] 匹配从20到29的两位数。
      • 3[0-1] 匹配30或31。
  • $ 表示字符串的结束。

canal.properties

修改配置文件canal.properties

  • 因为我所使用的消息队列是rocketmq,首先将模式canal.servermode设置为rocketmq
  • rocketmq.namesrv.addr指向的rocketmq服务器指向正确的ip和端口

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

启动

启动canal,如果是win,直接双击startup.bat启动即可

在这里插入图片描述

下图启动之后弹出的窗口,如果要关闭 canal ,就点右上角的 x 即可

在这里插入图片描述

想要查看 canal 是否启动成功,可以通过日志文件查看,如果出现如下红色部分的输出,说明启动成功

在这里插入图片描述

测试

将订单状态从0改成2

在这里插入图片描述

去rocketmq中查看收到的消息,topic选择刚刚配置文件中设置的vrs_canal_common_topic

在这里插入图片描述

刚刚接收到的消息详情如下

在这里插入图片描述

使用json格式化工具查看

{
  "data": [
    {
      "order_sn": "1866821518450221056850432",
      "is_deleted": "0",
      "order_time": "2024-12-11 20:25:09",
      "venue_id": "1865271207637635072",
      "partition_id": "1865276571322015744",
      "partition_index": "0",
      "time_period_id": "1866776397058904064",
      "user_id": "1864637732760850432",
      "order_status": "2",
      "payment_method": null,
      "transaction_id": null,
      "pay_time": null,
      "pay_amount": null,
      "refund_status": null,
      "refund_amount": null,
      "refund_time": null
    }
  ],
  "database": "venue-reservation",
  "es": 1734228903000,
  "gtid": "",
  "id": 16,
  "isddl": false,
  "mysqltype": {
    "order_sn": "varchar(30)",
    "is_deleted": "tinyint",
    "order_time": "datetime",
    "venue_id": "bigint",
    "partition_id": "bigint",
    "partition_index": "int",
    "time_period_id": "bigint",
    "user_id": "bigint",
    "order_status": "tinyint",
    "payment_method": "tinyint",
    "transaction_id": "varchar(255)",
    "pay_time": "datetime",
    "pay_amount": "decimal(10,2)",
    "refund_status": "tinyint",
    "refund_amount": "decimal(10,2)",
    "refund_time": "datetime"
  },
  "old": [
    {
      "order_status": "0"
    }
  ],
  "pknames": [
    "order_sn"
  ],
  "sql": "",
  "sqltype": {
    "order_sn": 12,
    "is_deleted": -6,
    "order_time": 93,
    "venue_id": -5,
    "partition_id": -5,
    "partition_index": 4,
    "time_period_id": -5,
    "user_id": -5,
    "order_status": -6,
    "payment_method": -6,
    "transaction_id": 12,
    "pay_time": 93,
    "pay_amount": 3,
    "refund_status": -6,
    "refund_amount": 3,
    "refund_time": 93
  },
  "table": "time_period_order_0",
  "ts": 1734228903999,
  "type": "update"
}

在这里插入图片描述

消息监听处理

实体类

首先定义一个实体类,用来接收canal推送过来的消息

import lombok.data;

import java.util.list;
import java.util.map;

/**
 * 用来接收canal发送过来的消息的数据
 * @author dam
 * @create 2024/12/10 14:11
 */
@data
public class canalbinlogdto {
    /**
     * 变更之后的数据
     */
    private list<map<string, object>> data;

    /**
     * 数据库名称
     */
    private string database;

    /**
     * es 是指 mysql binlog 里原始的时间戳,也就是数据原始变更的时间
     * canal 的消费延迟 = ts - es
     */
    private long es;

    /**
     * 递增 id,从 1 开始
     */
    private long id;

    /**
     * 当前变更是否是 ddl 语句
     */
    private boolean isddl;

    /**
     * 表结构字段类型
     */
    private map<string, object> mysqltype;

    /**
     * 修改之前的旧数据
     */
    private list<map<string, object>> old;

    /**
     * 主键名称
     */
    private list<string> pknames;

    /**
     * sql 语句
     */
    private string sql;

    /**
     * sql 类型
     */
    private map<string, object> sqltype;

    /**
     * 表名
     */
    private string table;

    /**
     * ts 是指 canal 收到这个 binlog,构造为自己协议对象的时间
     * 应用消费的延迟 = now - ts
     */
    private long ts;

    /**
     * insert(新增)、update(更新)、delete(删除)等等
     */
    private string type;
}

监听

获取到消息之后,如果判断到所做的修改是update类型,而且修改的是订单号,即olddatamap.containskey("order_status"),则进一步判断是否为将订单号从0修改为2,如果是则调用恢复库存方法

import cn.hutool.core.util.objectutil;
import com.vrs.annotation.idempotent;
import com.vrs.constant.orderstatusconstant;
import com.vrs.constant.rocketmqconstant;
import com.vrs.domain.dto.mq.canalbinlogdto;
import com.vrs.domain.dto.req.timeperiodstockrestorereqdto;
import com.vrs.enums.idempotentsceneenum;
import com.vrs.service.timeperiodservice;
import lombok.requiredargsconstructor;
import lombok.sneakythrows;
import lombok.extern.slf4j.slf4j;
import org.apache.rocketmq.spring.annotation.messagemodel;
import org.apache.rocketmq.spring.annotation.rocketmqmessagelistener;
import org.apache.rocketmq.spring.core.rocketmqlistener;
import org.springframework.stereotype.component;

import java.util.map;

/**
 * @author dam
 * @create 2024/12/10 14:12
 */
@slf4j(topic = rocketmqconstant.canal_topic)
@component
@rocketmqmessagelistener(topic = rocketmqconstant.canal_topic,
        consumergroup = rocketmqconstant.canal_consumer_group,
        messagemodel = messagemodel.clustering
)
@requiredargsconstructor
public class canalbinlogcommonlistener implements rocketmqlistener<canalbinlogdto> {

    private final timeperiodservice timeperiodservice;

    /**
     * 消费消息的方法
     * 方法报错就会拒收消息
     *
     * @param messagewrapper 消息内容,类型和上面的泛型一致。如果泛型指定了固定的类型,消息体就是我们的参数
     */
    @idempotent(
            uniquekeyprefix = "canal_binlog_common:",
            key = "#canalbinlogdto.getid()+''",
            scene = idempotentsceneenum.mq,
            keytimeout = 3600l
    )
    @sneakythrows
    @override
    public void onmessage(canalbinlogdto canalbinlogdto) {
        if (canalbinlogdto.getold() == null) {
            return;
        }
        map<string, object> alterdatamap = canalbinlogdto.getdata().get(0);
        map<string, object> olddatamap = canalbinlogdto.getold().get(0);
        if (objectutil.equal(canalbinlogdto.gettype(), "update") && olddatamap.containskey("order_status")) {
            log.info("[消费者] 消费canal的消息,恢复时间段的库存和空闲场号,时间段id:{}", alterdatamap.get("time_period_id"));
            long userid = long.parselong(alterdatamap.get("user_id").tostring());
            long timeperiodid = long.parselong(alterdatamap.get("time_period_id").tostring());
            long partitionid = long.parselong(alterdatamap.get("partition_id").tostring());
            long courtindex;
            if (alterdatamap.containskey("partition_index")) {
                courtindex = long.parselong(alterdatamap.get("partition_index").tostring());
            } else {
                courtindex = long.parselong(alterdatamap.get("court_index").tostring());
            }
            integer orderstatus = integer.parseint(alterdatamap.get("order_status").tostring());
            integer oldorderstatus = integer.parseint(olddatamap.get("order_status").tostring());
            if (orderstatus.equals(orderstatusconstant.cancel) && oldorderstatus.equals(orderstatusconstant.un_paid)) {
                // 恢复库存
                timeperiodservice.restorestockandbookedslots(timeperiodstockrestorereqdto.builder()
                        .userid(userid)
                        .courtindex(courtindex)
                        .timeperiodid(timeperiodid)
                        .partitionid(partitionid)
                        .build());
            }
        }
    }
}

以上就是使用canal监听mysql binlog日志的实现方案的详细内容,更多关于canal监听mysql binlog的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com