当前位置: 代码网 > it编程>前端脚本>Python > Python与MySQL实现数据库实时同步的详细步骤

Python与MySQL实现数据库实时同步的详细步骤

2025年08月21日 Python 我要评论
前言在日常开发中,数据同步是一项常见的需求,比如:跨库同步:主数据库与从数据库之间的数据同步。异构系统:将 mysql 数据同步到其他存储系统(如 elasticsearch)。实时备份:实现高可用性

前言

在日常开发中,数据同步是一项常见的需求,比如:

  • 跨库同步:主数据库与从数据库之间的数据同步。
  • 异构系统:将 mysql 数据同步到其他存储系统(如 elasticsearch)。
  • 实时备份:实现高可用性,保障数据安全。

本篇文章将使用 pythonmysql 来实现数据库实时同步。我们将围绕数据变更捕获数据处理数据写入 这三个核心环节展开,提供易于理解的代码实现和实用方案。

摘要

通过 python 结合 mysql 的技术栈,我们可以实现实时同步的功能。本文将详细介绍以下内容:

  1. 数据同步的实现方案与思路。
  2. 使用 mysql 的 binlog(日志) 实现数据变更捕获。
  3. 使用 python pymysqlmysql-connector 处理数据变更。
  4. 实现数据实时同步的代码示例。
  5. 数据同步的优化与注意事项。

概述:数据同步方案

1. 基本思路

要实现数据库实时同步,主要包含三个核心步骤:

  1. 捕获数据变更
    • 使用 mysql binlog(二进制日志)来监听数据变化。
  2. 处理数据变更
    • 使用 python 解析 binlog,提取变化的数据。
  3. 写入目标数据库
    • 将变更后的数据实时写入目标数据库或存储系统。

2. mysql binlog 简介

mysql 的 binlog 是记录所有数据库更新事件的二进制日志,主要用于:

  • 数据库备份与恢复。
  • 主从复制(replication)。
  • 数据变更捕获(cdc,change data capture)。

我们将利用 binlog 监听数据库数据的变更事件(如 insertupdatedelete),然后通过 python 解析这些事件并同步到目标数据库。

实现步骤与代码示例

1. 前置准备

  • 环境配置
    • python 环境(推荐 3.8+)
    • mysql 数据库(开启 binlog)
    • 必须安装以下 python 包:
pip install pymysql mysql-connector-python pymysqlreplication

2. 配置 mysql 开启 binlog

在 mysql 配置文件 my.cnfmy.ini 中添加以下配置,启用 binlog:

[mysqld]
log-bin=mysql-bin        # 开启 binlog 功能
server-id=1              # 唯一标识符,必须设置
binlog-format=row        # 使用行级日志,便于捕获数据变更

重启 mysql 服务后,执行以下命令验证 binlog 是否启用:

show variables like 'log_bin';
show variables like 'binlog_format';

3. 使用 python 监听 mysql binlog 并实现数据同步

代码实现

使用 pymysqlreplication 库监听 binlog 日志,捕获数据库变化并同步到目标数据库。

from pymysqlreplication import binlogstreamreader
from pymysqlreplication.row_event import deleterowsevent, writerowsevent, updaterowsevent
import pymysql

# 源数据库配置
source_config = {
    "host": "localhost",
    "port": 3306,
    "user": "root",
    "passwd": "password"
}

# 目标数据库配置
target_config = {
    "host": "localhost",
    "port": 3306,
    "user": "root",
    "passwd": "password",
    "database": "target_db"
}

# 连接目标数据库
def write_to_target_db(query, params):
    connection = pymysql.connect(**target_config)
    try:
        with connection.cursor() as cursor:
            cursor.execute(query, params)
        connection.commit()
    finally:
        connection.close()

# 处理 binlog 事件
def process_binlog_event():
    stream = binlogstreamreader(
        connection_settings=source_config,
        server_id=100,   # 唯一 server_id
        blocking=true,   # 持续监听
        only_events=[writerowsevent, updaterowsevent, deleterowsevent]
    )

    for binlogevent in stream:
        for row in binlogevent.rows:
            if isinstance(binlogevent, writerowsevent):
                # insert 事件
                query = "insert into target_table (id, name, age) values (%s, %s, %s)"
                params = (row["values"]["id"], row["values"]["name"], row["values"]["age"])
                write_to_target_db(query, params)

            elif isinstance(binlogevent, updaterowsevent):
                # update 事件
                query = "update target_table set name=%s, age=%s where id=%s"
                params = (row["after_values"]["name"], row["after_values"]["age"], row["after_values"]["id"])
                write_to_target_db(query, params)

            elif isinstance(binlogevent, deleterowsevent):
                # delete 事件
                query = "delete from target_table where id=%s"
                params = (row["values"]["id"],)
                write_to_target_db(query, params)

    stream.close()

# 启动数据同步
if __name__ == "__main__":
    process_binlog_event()

代码解析

binlogstreamreader

  • 连接 mysql 并监听指定的 binlog 文件。
  • 参数 only_events 限定监听的事件类型(writerowseventupdaterowseventdeleterowsevent)。

数据捕获

  • insert:监听插入事件,将新数据写入目标表。
  • update:监听更新事件,根据主键更新目标表数据。
  • delete:监听删除事件,将对应数据从目标表删除。

数据写入

  • 使用 pymysql 将数据写入目标数据库。

实时监听

  • blocking=true 确保持续监听 binlog 变化,实现实时同步。

4. 数据同步优化与注意事项

binlog 格式

  • 使用 row 格式记录变更,确保捕获到详细的行级数据。

事务日志顺序

  • 保持事件顺序一致性,防止数据错乱。

异常处理

  • 添加异常捕获,防止程序中断时数据丢失。

性能优化

  • 对目标数据库进行批量插入和索引优化,提高写入性能。

应用场景

  1. 主从同步:实现 mysql 主数据库与从数据库的实时同步。
  2. 数据备份:实时备份数据库,防止数据丢失。
  3. 数据迁移:将 mysql 数据同步到其他存储系统(如 elasticsearch、redis 等)。
  4. 日志分析:实时捕获数据库变更,进行业务分析。

总结

通过本次实战,我们借助 mysql binlogpython 实现了数据的实时同步。在实际项目中,这种方案不仅高效稳定,而且易于扩展和维护。

关键点总结:

  • 开启 mysql binlog 并使用 row 格式。
  • 使用 python 库 pymysqlreplication 捕获数据变更。
  • 编写逻辑处理 insertupdatedelete 事件。
  • 将变更数据实时同步到目标数据库。

希望本文能帮助你在实际开发中快速实现数据库实时同步,提升数据管理效率!

以上就是python与mysql实现数据库实时同步的详细步骤的详细内容,更多关于python mysql数据库实时同步的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com