当前位置: 代码网 > it编程>前端脚本>Python > Python MySQL如何通过Binlog获取变更记录恢复数据

Python MySQL如何通过Binlog获取变更记录恢复数据

2025年01月23日 Python 我要评论
python mysql通过binlog获取变更记录恢复数据通过mysql的二进制日志(binlog)获取数据库的变更记录,并用于恢复数据,是一个相对高级的操作。这通常涉及读取binlog中的事件,解

python mysql通过binlog获取变更记录恢复数据

通过mysql的二进制日志(binlog)获取数据库的变更记录,并用于恢复数据,是一个相对高级的操作。

这通常涉及读取binlog中的事件,解析这些事件以了解数据变更的详细信息,然后基于这些信息来恢复或回滚数据。

在python中,你可以使用pymysqlreplication库来读取binlog,但请注意,这个库本身并不提供直接的数据恢复功能。它只能帮助你解析binlog中的事件。恢复数据需要你根据这些事件编写额外的逻辑。

以下是一个使用pymysqlreplication库通过binlog获取mysql操作记录的示例:

1.安装pymysqlreplication

首先,你需要安装这个库。你可以使用pip来安装:

pip install pymysqlreplication

2.配置mysql

确保你的mysql服务器启用了binlog,并且你有一个具有足够权限的mysql用户来读取binlog。

3.编写python脚本

下面是一个简单的python脚本,它使用pymysqlreplication.binlogstreamreader来读取binlog事件,并打印出插入、更新和删除操作的信息。

import json
import sys
from datetime import datetime
from pymysqlreplication import binlogstreamreader
from pymysqlreplication.row_event import (
    deleterowsevent,
    updaterowsevent,
    writerowsevent,
)
import pandas as pd

mysql_settings = {
    'host': '127.0.0.1',
    'port': 3306,
    'user': 'root',
    'password': '123456'
}
# 要监控的数据库和表 ssc_sjzz2
database_name = 'ssc_wfg'
table_name = 't_sys_user'

def default(o):
    if isinstance(o, datetime):
        return o.isoformat()
    raise typeerror("unserializable object {}".format(o))
def main():
    stream = binlogstreamreader(
        connection_settings=mysql_settings,
        server_id=6, # 必须与mysql服务器上的其他复制客户端不同
        only_events=[deleterowsevent, writerowsevent, updaterowsevent],
        only_tables=[table_name],
        only_schemas=[database_name]
        ) 
    df = pd.dataframe()
    for binlogevent in stream:
        if binlogevent.table == table_name and binlogevent.schema == database_name:
            time = binlogevent.formatted_timestamp.replace('t', ' ')
            timestamp= binlogevent.timestamp
            for row in binlogevent.rows:
                event = {
                    "schema": binlogevent.schema, 
                    "table": binlogevent.table,
                    "time": time,
                    "timestamp": timestamp,
                    }
            
                if isinstance(binlogevent, deleterowsevent):
                    event["action"] = "delete"
                    event["value"] = json.dumps(list(row["values"].items()), default=default)
                    # event = dict(event.items() + row["values"].items())
                elif isinstance(binlogevent, updaterowsevent):
                    event["action"] = "update"
                    event["value"] = json.dumps(list(row["after_values"].items()), default=default)
                    # event = dict(event.items() + row["after_values"].items())
                elif isinstance(binlogevent, writerowsevent):
                    event["action"] = "insert"
                    event["value"] = json.dumps(list(row["values"].items()), default=default)
                print(json.dumps(event, default=default))
                df = pd.concat([df, pd.dataframe(event, index=[0])], ignore_index=true)
    stream.close()
    df.to_excel('binlog.xlsx', index=false)
if __name__ == "__main__":
    main()

在这个脚本中:

  • mysql_settings包含了连接到mysql服务器所需的设置。
  • binlogstreamreader:包含了读取binlog所需的设置,包括server_id(必须是一个唯一的标识符,用于区分不同的复制客户端)和only_events(指定我们感兴趣的事件类型)。
  • stream函数根据事件的类型(删除、更新或插入)打印出相应的sql语句。
  • main:函数设置了binlog流读取器,并在捕获到任何异常时优雅地关闭流。
  • pandas:将结果输出到excel表格中,用于数据进行分析处理。

4.运行脚本

运行这个python脚本,它将连接到你的mysql服务器,并开始读取binlog中的事件。

每当有新的事件发生时(如插入、更新或删除操作),它都会打印出相应的sql语句。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com