前言
本文演示了一个数据库事务处理的典型案例:转账业务,可以通过python和pymysql实现一个包含异常回滚机制的转账场景,当系统在扣款后、加款前发生故障时,能够自动回滚到事务开始前的状态,确保资金不会丢失
代码重点展示了事务控制的关键步骤:关闭自动提交、执行sql操作、异常处理中的回滚机制,以及使用innodb引擎的必要性,通过模拟系统崩溃的场景,验证了事务的acid特性,最终确保alice和bob的账户余额保持正确一致,
这个例子模拟了一个经典的“转账”场景:a 给 b 转钱,如果在扣款后、收款前系统发生错误(比如断电、代码异常),必须让数据回到转账前的状态,保证钱不凭空消失。
环境准备
你需要安装 pymysql 库:
pip install pymysql
代码实现
import pymysql
import sys
# 数据库配置(请根据你的实际情况修改)
db_config = {
'host': 'localhost',
'user': 'root',
'password': 'your_password',
'database': 'test_db',
'charset': 'utf8mb4'
}
def setup_database(cursor):
"""初始化测试表和数据"""
try:
cursor.execute("drop table if exists accounts")
cursor.execute("""
create table accounts (
id int auto_increment primary key,
name varchar(50),
balance decimal(10, 2)
) engine=innodb default charset=utf8mb4
""")
# 插入初始数据:alice有1000元,bob有500元
cursor.execute("insert into accounts (name, balance) values ('alice', 1000.00)")
cursor.execute("insert into accounts (name, balance) values ('bob', 500.00)")
print("✅ 数据库初始化完成:alice=1000, bob=500")
except exception as e:
print(f"❌ 初始化失败: {e}")
def transfer_money_with_rollback(from_user, to_user, amount):
"""
模拟转账业务,并在发生错误时回滚
"""
connection = none
try:
# 1. 建立连接
connection = pymysql.connect(**db_config)
# 2. 关键步骤:关闭自动提交,开启事务
connection.autocommit(false)
with connection.cursor() as cursor:
# --- 步骤一:扣款 ---
print(f"\n💰 正在从 {from_user} 扣除 {amount} 元...")
sql_deduct = "update accounts set balance = balance - %s where name = %s"
cursor.execute(sql_deduct, (amount, from_user))
# 模拟查询扣款后的余额(仅为了演示,实际业务中可能不需要)
cursor.execute("select balance from accounts where name = %s", (from_user,))
result = cursor.fetchone()
print(f" 👉 扣款后查询 {from_user} 余额: {result[0]} (此时数据在内存/redo log中,未永久落盘)")
# --- 步骤二:模拟突发异常 ---
# 比如:此时服务器断电、网络中断、或者代码逻辑错误
print("⚠️ 模拟系统崩溃:准备加款时发生除零错误!")
error_simulation = 1 / 0 # 故意制造一个异常
# --- 步骤三:加款(正常情况下会执行,但上面报错了就不会走到这) ---
sql_add = "update accounts set balance = balance + %s where name = %s"
cursor.execute(sql_add, (amount, to_user))
# 3. 如果一切顺利,提交事务
connection.commit()
print("✅ 转账成功,事务已提交!")
except exception as e:
print(f"\n❌ 发生严重错误: {e}")
if connection:
# 4. 核心:发生任何异常,回滚所有操作
print("🔄 正在执行回滚操作 (rollback)...")
connection.rollback()
print("🛡️ 回滚成功!数据已恢复到事务开始前的状态。")
finally:
if connection:
# 5. 恢复自动提交模式并关闭连接
connection.autocommit(true)
connection.close()
def check_final_balance():
"""检查最终结果"""
conn = pymysql.connect(**db_config)
with conn.cursor() as cursor:
cursor.execute("select name, balance from accounts")
results = cursor.fetchall()
print("\n----- 最终账户余额 -----")
for row in results:
print(f"用户: {row[0]}, 余额: {row[1]}")
print("------------------------")
# 验证结果
alice_balance = results[0][1] if results[0][0] == 'alice' else results[1][1]
bob_balance = results[0][1] if results[0][0] == 'bob' else results[1][1]
assert alice_balance == 1000, f"alice余额错误!期望1000,实际{alice_balance}"
assert bob_balance == 500, f"bob余额错误!期望500,实际{bob_balance}"
print("🎉 验证通过:数据一致,回滚生效!")
conn.close()
if __name__ == "__main__":
# 初始化
conn_init = pymysql.connect(**db_config)
with conn_init.cursor() as cur:
setup_database(cur)
conn_init.commit()
conn_init.close()
# 执行带回滚的转账
transfer_money_with_rollback('alice', 'bob', 200)
# 检查最终数据是否正确回滚
check_final_balance()
代码讲解重点
connection.autocommit(false):这是事务的开关。默认情况下 mysql 是自动提交的(每句 sql 都是一个事务),关掉它才能把多步操作打包成一个整体。try...except...结构:业务逻辑必须放在try里。connection.rollback():这是“后悔药”。一旦进入except块,调用此方法会撤销从autocommit(false)之后的所有未提交更改。connection.commit():这是“确认键”。只有执行了这个,数据才真正写入磁盘(配合 redo log 和 binlog)。- engine=innodb:注意建表时指定了引擎为 innodb。如果是 myisam 引擎,它不支持事务,
rollback会失效!这是面试常考点。
运行这个脚本,你会看到虽然执行了扣款 sql,但因为中间报错触发了回滚,最后 alice 的钱还是 1000,bob 还是 500,完美保证了数据的一致性。
到此这篇关于python实现mysql错误回滚机制的实战代码的文章就介绍到这了,更多相关python mysql错误回滚内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论