接上篇
上篇讲了 ksycopg2 的安装配置、连接管理、高可用和连接池。这篇接着说怎么执行 sql、处理结果集,以及批量操作、copy 命令这些实用功能。
一、执行 sql 语句
1.1 基础查询
先看一个完整的查询流程:
import ksycopg2
conn = ksycopg2.connect(
database='test',
user='system',
password='123456',
host='127.0.0.1',
port='54321'
)
cur = conn.cursor()
# 执行查询
cur.execute("select id, name from test_user where age > %s", (18,))
# 获取结果
rows = cur.fetchall()
for row in rows:
print(f"id: {row[0]}, name: {row[1]}")
cur.close()
conn.close()
几个要点:
- 先用
cursor()创建游标 execute()执行 sqlfetchall()拿全部结果- 用完记得关闭游标和连接
1.2 获取结果集的几种方式
fetchone():一条一条拿
cur.execute("select id, name from test_user")
while true:
row = cur.fetchone()
if row is none:
break
print(row)
适合处理大结果集,不会一次性把所有数据加载到内存。
fetchmany():分批拿
cur.execute("select id, name from test_user")
while true:
rows = cur.fetchmany(100) # 一次拿100条
if not rows:
break
for row in rows:
process_row(row)
比 fetchone 快,又不会像 fetchall 那样吃内存。
fetchall():一次性全拿
cur.execute("select id, name from test_user")
rows = cur.fetchall() # 数据量小的时候用
for row in rows:
print(row)
数据量小的时候方便,几万条以上就别用了。
1.3 获取列信息
有时候需要知道查询结果有哪些列、什么类型,可以用 cursor.description:
cur.execute("select id, name, created_at from test_user")
for col in cur.description:
print(f"列名: {col.name}, 类型码: {col.type_code}, 长度: {col.internal_size}")
输出示例:
列名: id, 类型码: 23, 长度: 4
列名: name, 类型码: 1043, 长度: -1
列名: created_at, 类型码: 1114, 长度: 8
类型码是 kingbase 内部的数据类型 oid,一般用不到,但调试时有用。
1.4 执行非查询 sql
insert、update、delete 这些不返回结果集的 sql,直接用 execute() 执行就行:
cur = conn.cursor()
# 插入
cur.execute("insert into test_user (id, name) values (%s, %s)", (1, '张三'))
# 更新
cur.execute("update test_user set name = %s where id = %s", ('李四', 1))
# 删除
cur.execute("delete from test_user where id = %s", (1,))
# 注意:需要提交事务
conn.commit()
cur.close()
千万别忘了 commit(),否则数据不会真正写入。
二、参数传递与防 sql 注入
ksycopg2 用占位符 %s 传递参数,会自动处理转义,不用自己拼接字符串。
# 正确写法:参数单独传递
cur.execute(
"insert into test_user (id, name) values (%s, %s)",
(1, "张三")
)
# 错误写法:自己拼接字符串,有 sql 注入风险
cur.execute(f"insert into test_user (id, name) values ({id}, '{name}')")
2.1 位置占位符
# 用元组传参
cur.execute(
"select * from test_user where age > %s and name like %s",
(18, '%张%')
)
2.2 命名占位符
# 用字典传参,参数多的时候更清晰
cur.execute(
"select * from test_user where age > %(min_age)s and name like %(name_pattern)s",
{'min_age': 18, 'name_pattern': '%张%'}
)
三、批量操作
3.1 executemany() 的问题
ksycopg2 提供了 executemany(),但它的实现方式是循环调用 execute(),一次发一条 sql,性能提升不大。
data = [(1, '张三'), (2, '李四'), (3, '王五')]
cur.executemany("insert into test_user (id, name) values (%s, %s)", data)
conn.commit()
数据量小的时候用用还行,大批量插入不建议。
3.2 execute_batch() 批量执行
ksycopg2.extras.execute_batch() 把多条 sql 分成若干组,每组一次发给数据库,减少网络往返次数。
from ksycopg2 import extras
data = [(1, '张三'), (2, '李四'), (3, '王五'), ...] # 几百条数据
extras.execute_batch(
cur,
"insert into test_user (id, name) values (%s, %s)",
data,
page_size=100 # 每100条发一次
)
conn.commit()
page_size 控制每组多少条。太小网络交互多,太大 sql 语句太长,100-500 之间比较合适。
3.3 execute_values() 一条 sql 插多行
execute_values() 把所有数据拼成一条 insert 语句,效率最高:
from ksycopg2 import extras
data = [(1, '张三'), (2, '李四'), (3, '王五')]
extras.execute_values(
cur,
"insert into test_user (id, name) values %s",
data,
page_size=100
)
conn.commit()
生成的 sql 类似:
insert into test_user (id, name) values (1, '张三'), (2, '李四'), (3, '王五')
一次性插入几千条数据时,execute_values 比 execute_batch 快不少。
三种批量插入方式对比(插入 1 万条数据测试):
| 方式 | 网络往返 | 速度 | 适用场景 |
|---|---|---|---|
| executemany | 1万次 | 慢 | 少量数据 |
| execute_batch | 100次 | 中 | 中等数据量 |
| execute_values | 1次 | 快 | 大批量数据 |
四、调用存储过程
4.1 调用函数
# 先创建函数
cur.execute("""
create or replace function add_user(p_id integer, p_name text)
returns text as $$
begin
insert into test_user (id, name) values (p_id, p_name);
return 'success';
end;
$$ language plpgsql;
""")
# 调用函数
cur.callproc('add_user', (10, 'test_user'))
result = cur.fetchone()
print(result) # ('success',)
conn.commit()
4.2 调用存储过程
调用存储过程需要用 execute() 配合 call 语句:
-- 创建存储过程
create or replace procedure update_user_name(
p_id integer,
p_new_name text
)
language plpgsql
as $$
begin
update test_user set name = p_new_name where id = p_id;
end;
$$;# 调用存储过程
cur.execute("call update_user_name(%s, %s)", (1, '新名字'))
conn.commit()
五、copy 命令:高效数据导入导出
copy 是 kingbase 提供的快速数据导入导出方式,比 insert 快很多。
5.1 copy_from():从文件导入
cur = conn.cursor()
with open('data.csv', 'r') as f:
cur.copy_from(
file=f,
table='test_user',
columns=('id', 'name'),
sep=',' # 列分隔符
)
conn.commit()
默认分隔符是制表符 \t,csv 文件需要指定 sep=','。null 值默认用 \n 表示,也可以改:
cur.copy_from(f, 'test_user', columns=('id', 'name'), sep=',', null='null')
5.2 copy_to():导出到文件
with open('export.csv', 'w') as f:
cur.copy_to(
file=f,
table='test_user',
columns=('id', 'name'),
sep=','
)
conn.commit()
5.3 copy_expert():自定义 copy
copy_expert 最灵活,可以写完整的 copy 语句:
copy_sql = """
copy test_user(id, name)
to stdout
with csv header delimiter as ','
"""
with open('export_with_header.csv', 'w') as f:
cur.copy_expert(sql=copy_sql, file=f)
用在数据迁移、日志导出、报表生成这些场景,效率比 select 一行行写高得多。
六、大对象处理
kingbase 支持 blob(二进制大对象)和 clob(字符大对象)。ksycopg2 可以处理这些类型。
import ksycopg2
conn = ksycopg2.connect(
database='test', user='system',
password='123456', host='127.0.0.1', port='54321'
)
cur = conn.cursor()
# 建表
cur.execute('drop table if exists test_lob')
cur.execute('''
create table test_lob (
id integer,
b blob,
c clob,
ba bytea
)
''')
# 准备测试数据
ba_data = bytearray("中文测试bytearray", "utf8")
b_data = bytes('中文测试bytes' * 2, "utf8")
str_data = '中文str' * 4
# 插入
cur.execute(
"insert into test_lob values (%s, %s, %s, %s)",
(1, ba_data, ba_data, ba_data)
)
cur.execute(
"insert into test_lob values (%s, %s, %s, %s)",
(2, b_data, b_data, b_data)
)
cur.execute(
"insert into test_lob values (%s, %s, %s, %s)",
(3, str_data, str_data, str_data)
)
conn.commit()
# 读取
cur.execute("select id, b, c, ba from test_lob")
rows = cur.fetchall()
for row in rows:
for cell in row:
# bytea 类型返回 memoryview,需要转换
if isinstance(cell, memoryview):
print(cell[:].tobytes().decode('utf8'), end=" ")
else:
print(cell, end=" ")
print()
cur.close()
conn.close()
三个注意事项:
bytea类型在 python 3 里返回memoryview,用tobytes()转换- 大对象不建议频繁读写,性能不好
- 超大文件(几十 mb 以上)建议存文件系统,数据库只存路径
七、动态 sql 生成
ksycopg2.sql 模块解决了一个头疼的问题:表名、字段名这类标识符不能直接用参数传递。
from ksycopg2 import sql
# 错误写法:标识符不能参数化
cur.execute("select * from %s where id = %s", ('test_user', 1))
# 正确写法:用 sql 模块
cur.execute(
sql.sql("select * from {} where id = %s").format(sql.identifier('test_user')),
(1,)
)
7.1 拼接带标识符的 sql
table_name = 'test_user'
id_column = 'user_id'
name_column = 'user_name'
query = sql.sql("select {id}, {name} from {table} where {id} = %s").format(
id=sql.identifier(id_column),
name=sql.identifier(name_column),
table=sql.identifier(table_name)
)
cur.execute(query, (100,))
7.2 使用字面值
from ksycopg2 import sql
# 把 python 值直接转成 sql 字面量
value = sql.literal("it's a test")
cur.execute(
sql.sql("select {}").format(value)
)
# 生成: select 'it''s a test'
7.3 动态 in 查询
ids = [1, 2, 3, 4, 5]
placeholders = ','.join(['%s'] * len(ids))
cur.execute(
f"select * from test_user where id in ({placeholders})",
ids
)
八、错误处理
import ksycopg2
from ksycopg2 import operationalerror, integrityerror
try:
conn = ksycopg2.connect(database='test', user='system', password='123456')
cur = conn.cursor()
cur.execute("insert into test_user (id, name) values (%s, %s)", (1, '张三'))
conn.commit()
except integrityerror as e:
print(f"违反唯一约束: {e}")
conn.rollback()
except operationalerror as e:
print(f"连接或操作错误: {e}")
except exception as e:
print(f"其他错误: {e}")
conn.rollback()
finally:
cur.close()
conn.close()
常见异常类型:
integrityerror:违反约束(主键重复、外键不存在等)operationalerror:连接断开、语法错误等programmingerror:表不存在、列不存在等
九、完整示例:从连接到批量插入
import ksycopg2
from ksycopg2 import extras
def main():
conn = none
try:
conn = ksycopg2.connect(
database='test',
user='system',
password='123456',
host='127.0.0.1',
port='54321'
)
conn.autocommit = false
cur = conn.cursor()
# 建表
cur.execute('drop table if exists test_user')
cur.execute('''
create table test_user (
id integer primary key,
name varchar(100),
created_at timestamp default current_timestamp
)
''')
# 批量插入
data = [(i, f'user_{i}') for i in range(1, 1001)]
extras.execute_values(
cur,
"insert into test_user (id, name) values %s",
data,
page_size=200
)
# 查询
cur.execute("select count(*) from test_user")
count = cur.fetchone()[0]
print(f"插入了 {count} 条数据")
conn.commit()
cur.close()
except exception as e:
print(f"操作失败: {e}")
if conn:
conn.rollback()
finally:
if conn:
conn.close()
if __name__ == "__main__":
main()
十、总结
下篇主要讲了这几个方面:
- sql 执行:
execute()执行查询和非查询语句,fetchone/fetchmany/fetchall获取结果 - 参数传递:用
%s占位符,不要拼接字符串 - 批量操作:
execute_values性能最佳,大批量插入首选 - copy 命令:数据导入导出最快的方式
- 大对象:blob/clob/bytea 的处理方式
- 动态 sql:表名、字段名用
sql模块处理 - 错误处理:区分异常类型,正确处理事务回滚
两篇合在一起,覆盖了 python 操作金仓数据库的常用场景。从连接到高可用,从 sql 执行到批量操作,基本够用了。如果遇到官方文档没覆盖的场景,建议去金仓官网看最新的驱动包和示例。
到此这篇关于python金仓数据库操作之sql执行,批量操作与扩展功能的完全指南的文章就介绍到这了,更多相关python操作金仓数据库内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论