引言
在数据驱动的现代应用中,数据库同步是确保数据一致性和可用性的关键环节。mysql作为最流行的开源关系型数据库之一,其数据同步需求广泛存在于主从复制、数据迁移、备份恢复等场景。本文将详细介绍如何使用python实现高效可靠的mysql数据同步方案,涵盖基础同步方法、增量同步策略以及错误处理机制。
一、准备工作
1. 环境配置
首先确保已安装:
- python 3.6+
- mysql服务器(源库和目标库)
- 必要的python库:
pip install pymysql sqlalchemy sshtunnel # 基本依赖 pip install pandas mysql-connector-python # 高级功能可选
2. 数据库连接配置
创建配置文件db_config.py:
source_db = {
'host': 'source_host',
'user': 'username',
'password': 'password',
'database': 'db_name',
'port': 3306,
'charset': 'utf8mb4'
}
target_db = {
'host': 'target_host',
'user': 'username',
'password': 'password',
'database': 'db_name',
'port': 3306
}
二、基础同步方法
方法1:使用pymysql全量同步
import pymysql
from db_config import source_db, target_db
def full_sync(source_config, target_config):
try:
# 连接源数据库
source_conn = pymysql.connect(**source_config)
with source_conn.cursor() as src_cursor:
src_cursor.execute("show tables")
tables = src_cursor.fetchall()
# 连接目标数据库
target_conn = pymysql.connect(**target_config)
for (table,) in tables:
print(f"同步表: {table}")
# 获取表结构
src_cursor.execute(f"show create table {table}")
create_table_sql = src_cursor.fetchone()[1]
# 在目标库重建表(先删除旧表)
with target_conn.cursor() as tgt_cursor:
tgt_cursor.execute(f"drop table if exists {table}")
tgt_cursor.execute(create_table_sql)
# 获取数据并插入
src_cursor.execute(f"select * from {table}")
rows = src_cursor.fetchall()
if rows:
columns = [desc[0] for desc in src_cursor.description]
placeholders = ', '.join(['%s'] * len(columns))
insert_sql = f"insert into {table} ({', '.join(columns)}) values ({placeholders})"
with target_conn.cursor() as tgt_cursor:
tgt_cursor.executemany(insert_sql, rows)
target_conn.commit()
except exception as e:
print(f"同步失败: {str(e)}")
finally:
source_conn.close() if 'source_conn' in locals() else none
target_conn.close() if 'target_conn' in locals() else none
# 执行全量同步
full_sync(source_db, target_db)
方法2:使用sqlalchemy(orm方式)
from sqlalchemy import create_engine, metadata
from sqlalchemy.orm import sessionmaker
from db_config import source_db, target_db
def orm_sync():
# 创建引擎
source_engine = create_engine(
f"mysql+pymysql://{source_db['user']}:{source_db['password']}@"
f"{source_db['host']}:{source_db['port']}/{source_db['database']}"
)
target_engine = create_engine(
f"mysql+pymysql://{target_db['user']}:{target_db['password']}@"
f"{target_db['host']}:{target_db['port']}/{target_db['database']}"
)
# 获取源库元数据
source_meta = metadata(bind=source_engine)
source_meta.reflect()
# 创建目标会话
targetsession = sessionmaker(bind=target_engine)
target_session = targetsession()
try:
for table_name, table in source_meta.tables.items():
print(f"处理表: {table_name}")
# 清空目标表(生产环境应考虑更安全的策略)
target_session.execute(f"truncate table {table_name}")
# 查询源数据
result = source_engine.execute(table.select())
rows = result.fetchall()
if rows:
# 批量插入
insert_stmt = table.insert().values(rows)
target_session.execute(insert_stmt)
target_session.commit()
except exception as e:
target_session.rollback()
print(f"同步错误: {str(e)}")
finally:
target_session.close()
三、增量同步策略
1. 基于时间戳的增量同步
def incremental_sync(last_sync_time):
try:
source_conn = pymysql.connect(**source_db)
target_conn = pymysql.connect(**target_db)
with source_conn.cursor() as src_cursor, target_conn.cursor() as tgt_cursor:
# 假设所有表都有update_time字段
src_cursor.execute("show tables")
tables = [table[0] for table in src_cursor.fetchall()]
for table in tables:
# 查询增量数据
query = f"""
select * from {table}
where update_time > '{last_sync_time}'
"""
src_cursor.execute(query)
new_rows = src_cursor.fetchall()
if new_rows:
columns = [desc[0] for desc in src_cursor.description]
placeholders = ', '.join(['%s'] * len(columns))
insert_sql = f"""
insert into {table} ({', '.join(columns)})
values ({placeholders})
on duplicate key update
""" + ', '.join([f"{col}=values({col})" for col in columns[1:]])
tgt_cursor.executemany(insert_sql, new_rows)
target_conn.commit()
# 更新最后同步时间(实际应持久化存储)
current_time = datetime.now().strftime('%y-%m-%d %h:%m:%s')
except exception as e:
print(f"增量同步失败: {str(e)}")
finally:
source_conn.close()
target_conn.close()
2. 使用binlog实现实时同步
对于需要实时同步的场景,可以使用mysql-replication库监听binlog:
from pymysqlreplication import binlogstreamreader
import pymysql
def binlog_sync():
mysql_settings = {
'host': source_db['host'],
'port': source_db['port'],
'user': source_db['user'],
'passwd': source_db['password']
}
target_conn = pymysql.connect(**target_db)
stream = binlogstreamreader(
mysql_settings,
server_id=100,
blocking=true,
only_events=[deleterowsevent, writerowsevent, updaterowsevent]
)
try:
for binlogevent in stream:
binlogevent.dump()
for row in binlogevent.rows:
table = binlogevent.table
event_type = binlogevent.__class__.__name__
# 根据事件类型处理数据
if event_type == "writerowsevent":
# 处理插入
pass
elif event_type == "updaterowsevent":
# 处理更新
pass
elif event_type == "deleterowsevent":
# 处理删除
pass
except keyboardinterrupt:
print("手动停止同步")
finally:
stream.close()
target_conn.close()
四、高级优化技巧
1. 多线程加速同步
from concurrent.futures import threadpoolexecutor
import pymysql
def sync_table(table_name, source_config, target_config):
try:
source_conn = pymysql.connect(**source_config)
target_conn = pymysql.connect(**target_config)
with source_conn.cursor() as src_cursor, target_conn.cursor() as tgt_cursor:
# 实现单表同步逻辑...
except exception as e:
print(f"表{table_name}同步失败: {str(e)}")
def parallel_sync():
source_conn = pymysql.connect(**source_db)
with source_conn.cursor() as cursor:
cursor.execute("show tables")
tables = [table[0] for table in cursor.fetchall()]
with threadpoolexecutor(max_workers=4) as executor:
for table in tables:
executor.submit(sync_table, table, source_db, target_db)
2. 数据校验机制
def verify_sync(source_config, target_config):
source_conn = pymysql.connect(**source_config)
target_conn = pymysql.connect(**target_config)
mismatches = []
with source_conn.cursor() as src_cursor, target_conn.cursor() as tgt_cursor:
src_cursor.execute("show tables")
tables = [table[0] for table in src_cursor.fetchall()]
for table in tables:
# 计算源表记录数
src_cursor.execute(f"select count(*) from {table}")
src_count = src_cursor.fetchone()[0]
# 计算目标表记录数
tgt_cursor.execute(f"select count(*) from {table}")
tgt_count = tgt_cursor.fetchone()[0]
if src_count != tgt_count:
mismatches.append((table, "记录数不匹配", src_count, tgt_count))
# 可选:抽样校验数据内容...
if mismatches:
print("发现数据不一致:")
for item in mismatches:
print(item)
return false
return true
五、生产环境建议
- 连接池管理:使用
dbutils或sqlalchemy的连接池 - 断点续传:记录同步进度,支持中断后恢复
- 监控告警:集成prometheus监控同步指标
- 安全加固:
- 使用ssh隧道加密传输
- 最小权限原则配置数据库用户
- 敏感信息使用环境变量或密钥管理服务
六、完整示例项目结构
mysql_sync/
├── config/
│ ├── db_config.py # 数据库配置
│ └── logger_config.py # 日志配置
├── core/
│ ├── sync_engine.py # 核心同步逻辑
│ ├── verifier.py # 数据校验
│ └── utils.py # 工具函数
├── scripts/
│ ├── full_sync.py # 全量同步脚本
│ └── incremental.py # 增量同步脚本
└── tests/
└── test_sync.py # 单元测试
结论
python提供了灵活多样的方式来实现mysql数据同步,从简单的全量复制到复杂的实时同步均可覆盖。根据实际业务需求,可以选择:
- 小数据量场景:使用pymysql直接操作
- 复杂业务场景:采用sqlalchemy orm
- 实时性要求高:结合binlog监听
- 大数据量场景:实现分表并行同步
建议在实际部署前进行充分的测试,特别是在数据一致性要求严格的场景下,务必添加完善的数据校验机制。
以上就是使用python高效实现mysql数据同步的几种方案的详细内容,更多关于python mysql数据同步的资料请关注代码网其它相关文章!
发表评论