一、先搞懂:python 与 mysql 数据类型的默认映射
pymysql会自动完成基础类型的转换,你先记住核心映射规则,大部分场景不用手动处理:
| mysql 数据类型 | python 类型 | 说明 |
|---|---|---|
| int/tinyint/bigint | int | 整数类型直接映射 |
| float/double/decimal | float/decimal | decimal 默认转 float(需精准则手动转) |
| varchar/text | str | 字符串自动转 unicode |
| date/datetime/timestamp | datetime.datetime | 日期时间自动转 datetime 对象 |
| boolean | bool | mysql 的 bool 本质是 tinyint (1) |
| null | none | 空值对应 python 的 none |
| blob | bytes | 二进制数据转 bytes |
二、手动处理数据类型转换的场景与示例
自动转换满足 80% 的场景,但遇到精度要求高、格式自定义、特殊类型时,需要手动干预,以下是高频场景:
1. 场景 1:decimal 类型(金额 / 精度数据)避免浮点误差
mysql 的 decimal 用于存储金额、汇率等精准数值,pymysql默认转 float 会丢失精度,需手动转decimal.decimal:
import pymysql
from decimal import decimal
# 1. 连接时指定转换规则(推荐)
config = {
'host': 'localhost',
'user': 'root',
'password': '你的密码',
'database': 'test_db',
'conv': pymysql.converters.conversions.copy() # 复制默认转换规则
}
# 重写decimal转换规则:转decimal而非float
config['conv'][pymysql.field_type.decimal] = decimal
config['conv'][pymysql.field_type.newdecimal] = decimal
# 2. 连接并查询
connection = pymysql.connect(**config)
cursor = connection.cursor(pymysql.cursors.dictcursor)
# 创建测试表(含decimal字段)
cursor.execute("""
create table if not exists order_info (
id int auto_increment primary key,
order_no varchar(20),
amount decimal(10,2) # 金额,10位总长度,2位小数
)
""")
# 插入数据(直接传decimal更精准)
cursor.execute("insert into order_info (order_no, amount) values (%s, %s)",
("ord202501", decimal("99.99")))
connection.commit()
# 查询验证(amount直接是decimal类型)
cursor.execute("select * from order_info where id = %s", (1,))
result = cursor.fetchone()
print(f"金额:{result['amount']},类型:{type(result['amount'])}") # 输出:99.99,类型:<class 'decimal.decimal'>
cursor.close()
connection.close()
2. 场景 2:日期时间类型的自定义格式转换
mysql 的 datetime 自动转datetime.datetime,但有时需要转字符串(如接口返回)或自定义格式:
import pymysql
from datetime import datetime
connection = pymysql.connect(
host='localhost',
user='root',
password='你的密码',
database='test_db',
cursorclass=pymysql.cursors.dictcursor
)
cursor = connection.cursor()
# 1. 写入:python datetime对象直接传入(自动转mysql datetime)
insert_time = datetime(2025, 1, 19, 10, 30, 0)
cursor.execute("insert into users (name, create_time) values (%s, %s)",
("王五", insert_time))
connection.commit()
# 2. 读取:datetime对象转自定义格式字符串
cursor.execute("select create_time from users where name = %s", ("王五",))
result = cursor.fetchone()
dt_obj = result['create_time']
# 转成"yyyy-mm-dd hh:mm:ss"字符串
dt_str = dt_obj.strftime("%y-%m-%d %h:%m:%s")
print(f"创建时间:{dt_str},类型:{type(dt_str)}") # 输出:2025-01-19 10:30:00,类型:str
# 3. 反向:字符串转datetime写入
dt_str2 = "2025-01-20 15:00:00"
dt_obj2 = datetime.strptime(dt_str2, "%y-%m-%d %h:%m:%s")
cursor.execute("update users set create_time = %s where name = %s",
(dt_obj2, "王五"))
connection.commit()
cursor.close()
connection.close()
3. 场景 3:json 类型的处理(mysql 5.7 + 支持)
mysql 的 json 类型需手动序列化 / 反序列化(pymysql不会自动转 python 字典):
import pymysql
import json
connection = pymysql.connect(
host='localhost',
user='root',
password='你的密码',
database='test_db',
cursorclass=pymysql.cursors.dictcursor
)
cursor = connection.cursor()
# 1. 创建含json字段的表
cursor.execute("""
create table if not exists user_profile (
id int auto_increment primary key,
user_id int,
profile json # json类型字段
)
""")
# 2. 写入:python字典转json字符串
profile_dict = {"hobby": ["篮球", "编程"], "address": {"city": "北京"}}
profile_json = json.dumps(profile_dict, ensure_ascii=false) # ensure_ascii=false保留中文
cursor.execute("insert into user_profile (user_id, profile) values (%s, %s)",
(1, profile_json))
connection.commit()
# 3. 读取:json字符串转python字典
cursor.execute("select profile from user_profile where user_id = %s", (1,))
result = cursor.fetchone()
profile_str = result['profile']
profile_dict2 = json.loads(profile_str)
print(f"爱好:{profile_dict2['hobby']},城市:{profile_dict2['address']['city']}")
# 输出:爱好:['篮球', '编程'],城市:北京
cursor.close()
connection.close()
4. 场景 4:blob / 二进制数据(如图片、文件)
mysql 的 blob 类型对应 python 的 bytes,读写时需处理二进制数据:
import pymysql
connection = pymysql.connect(
host='localhost',
user='root',
password='你的密码',
database='test_db',
cursorclass=pymysql.cursors.dictcursor
)
cursor = connection.cursor()
# 1. 创建含blob字段的表
cursor.execute("""
create table if not exists file_store (
id int auto_increment primary key,
file_name varchar(100),
file_data blob # 二进制数据字段
)
""")
# 2. 写入:读取文件转bytes写入
with open("test.jpg", "rb") as f:
file_bytes = f.read()
cursor.execute("insert into file_store (file_name, file_data) values (%s, %s)",
("test.jpg", file_bytes))
connection.commit()
# 3. 读取:bytes写入文件
cursor.execute("select file_data from file_store where file_name = %s", ("test.jpg",))
result = cursor.fetchone()
file_bytes2 = result['file_data']
with open("test_copy.jpg", "wb") as f:
f.write(file_bytes2)
print("文件已保存为test_copy.jpg")
cursor.close()
connection.close()
三、避坑要点
- 避免手动拼接类型转换:不要用
str(数值)拼接 sql,始终用%s参数占位符,pymysql会自动处理类型映射; - 空值处理:mysql 的 null 对应 python 的 none,写入时直接传 none,读取时判断是否为 none 避免报错;
- 编码问题:字符串类型(varchar/text)确保连接时指定
charset='utf8mb4',避免中文 / 特殊字符转码错误; - 超大整数处理:mysql 的 bigint 如果超过 python int 范围(如大于 2^63-1),需用
pymysql的long转换(python 3 已统一为 int,无需额外处理)。
总结
- 基础类型:
pymysql自动完成 int/str/bool/none 等类型映射,无需手动处理; - 高精度 / 特殊类型:decimal 转
decimal.decimal、json 用json模块序列化、blob 用 bytes 处理; - 日期时间:利用
datetime模块完成对象与字符串的互转,写入时直接传 datetime 对象更安全; - 核心原则:优先用参数化查询(% s),避免手动类型拼接,防止转换错误和 sql 注入。
到此这篇关于python中操作mysql数据库时如何处理数据类型转换问题的文章就介绍到这了,更多相关python操作mysql数据类型转换内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论