问题背景
在开发过程中,我们经常会遇到需要批量写入大量数据到 postgresql 数据库的场景。当使用传统的参数化插入语句时,可能会遇到如下错误:
pq: got 86575 parameters but postgresql only supports 65535 parameters
这是因为 postgresql 对单个查询的参数数量有限制(通常为 65535)。传统的解决方案是进行数据分片,分批写入数据库。但这种方法存在以下问题:
- 需要手动管理分片逻辑
- 多次数据库往返,网络开销大
- 事务管理复杂
- 性能不够理想
copy 协议解决方案
copy 协议简介
postgresql 的 copy 协议是专门为高效批量数据操作设计的二进制协议,具有以下优势:
- 高性能:避免了 sql 解析开销,直接使用二进制格式传输数据
- 低内存占用:流式处理,不需要在内存中构建庞大的 sql 语句
- 事务安全:可以在事务中执行,保证数据一致性
- 无参数限制:不受 postgresql 参数数量限制
二进制协议原理
copy 协议使用 postgresql 的前后端协议进行数据传输,其工作流程如下:
- 启动 copy 模式:客户端发送
copy from stdin命令 - 数据传输:使用二进制格式按行发送数据
- 结束传输:发送特定的结束标记
- 确认完成:服务器返回处理结果
二进制格式避免了文本解析的开销,直接使用网络字节序传输数据,大大提高了传输效率。
实战实现
依赖库
import (
"github.com/lib/pq"
"gorm.io/gorm"
)
核心实现代码
// batchcreate 批量创建消息接收者记录 - 使用 copy 协议
func (r *receiverrepo) batchcreate(ctx context.context, db *gorm.db, data []*define.wecommsgreceiver) (rowsaffected int64, err error) {
db = r.withtrace(ctx, db)
db = db.table(r.tablename())
if len(data) == 0 {
return 0, nil
}
// 过滤掉 nil 的数据
validdata := make([]*define.wecommsgreceiver, 0, len(data))
for _, item := range data {
if item != nil {
validdata = append(validdata, item)
}
}
if len(validdata) == 0 {
return 0, nil
}
// 获取底层 sql.db
sqldb := db.db()
// 开始事务
tx, err := sqldb.begintx(ctx, nil)
if err != nil {
return 0, fmt.errorf("开始事务失败:%+v", err)
}
defer func() {
if err != nil {
tx.rollback()
}
}()
// 创建 copy writer
stmt, err := tx.prepare(pq.copyin(r.tablename(), "send_log_id", "user_id", "status", "created_at", "updated_at"))
if err != nil {
return 0, fmt.errorf("准备 copy 语句失败:%+v", err)
}
defer stmt.close()
// 批量写入数据
for _, item := range validdata {
_, err = stmt.exec(item.sendlogid, item.userid, item.status, item.createdat, item.updatedat)
if err != nil {
return 0, fmt.errorf("写入数据失败:%+v", err)
}
}
// 执行 copy
_, err := stmt.exec()
if err != nil {
return 0, fmt.errorf("执行 copy 失败:%+v", err)
}
// 提交事务
if err = tx.commit(); err != nil {
return 0, fmt.errorf("提交事务失败:%+v", err)
}
rowsaffected = int64(len(validdata))
return rowsaffected, nil
}
代码说明
- 数据验证:首先过滤掉 nil 数据,确保数据有效性
- 事务管理:使用事务确保数据一致性,出错时自动回滚
- copy 准备:通过
pq.copyin准备 copy 语句,指定表名和列名 - 批量写入:遍历数据并执行
exec,但此时数据还在客户端缓冲区 - 最终执行:调用
stmt.exec()真正将数据发送到服务器 - 事务提交:提交事务,完成批量写入
完整测试用例
// 设置测试数据库
func setuptestdb() (*gorm.db, error) {
ctx := context.background()
postgres, err := infrastructure.dialpostgres(ctx, infrastructure.postgresconfig{
host: "host",
port: 5432,
username: "postgres",
password: "xxxxx",
database: "xxxxx",
})
if err != nil {
return nil, err
}
return postgres, nil
}
func setuplogger() factory.logfactory {
logger, _ := factory.newjsonfactory(factory.newlevel("info"), factory.newzapoption(factory.addcallerskip(0)))
return logger
}
func testreceiverrepo_batchcreate(t *testing.t) {
db, err := setuptestdb()
require.noerror(t, err)
defer db.close()
// 创建日志工厂
logger := setuplogger()
// 创建 repository 实例
repo := newreceiverrepository(db, logger)
// 准备测试数据 - 20000 条记录,使用负的 send_log_id 避免污染数据
testdata := make([]*define.wecommsgreceiver, 0, 20000)
now := time.now()
negativesendlogid := int64(-100000) // 使用负的 send_log_id
for i := 0; i < 20000; i++ {
testdata = append(testdata, &define.wecommsgreceiver{
sendlogid: negativesendlogid,
userid: "test_user_" + fmt.sprint(i),
status: 1,
createdat: now,
updatedat: now,
})
}
ctx := context.background()
// 执行批量插入
rowsaffected, err := repo.batchcreate(ctx, db, testdata)
// 验证结果
assert.noerror(t, err)
assert.equal(t, int64(20000), rowsaffected)
// 验证数据是否正确插入
var count int64
query := "select count(*) from wecom_msg_receiver where send_log_id < 0 and send_log_id >= ?"
err = db.raw(query, negativesendlogid).count(&count).error
assert.noerror(t, err)
assert.equal(t, int64(20000), count)
// 清理测试数据
deletequery := "delete from wecom_msg_receiver where send_log_id < 0 and send_log_id >= ?"
result := db.exec(deletequery, negativesendlogid)
assert.noerror(t, result.error)
assert.equal(t, int64(20000), result.rowsaffected)
// 验证清理是否成功
err = db.raw(query, negativesendlogid).count(&count).error
assert.noerror(t, err)
assert.equal(t, int64(0), count)
}
性能对比
在实际测试中,copy 协议相比传统分批插入有显著性能提升:
| 方案 | 20000 条数据耗时 | 内存占用 | 网络请求次数 |
|---|---|---|---|
| 传统分批插入 | ~15 秒 | 高 | 多次 |
| copy 协议 | ~2 秒 | 低 | 1 次 |
注意事项
- 错误处理:copy 协议中某行数据错误可能导致整个批量操作失败
- 数据类型:确保 go 数据类型与 postgresql 列类型匹配
- 连接池:长时间运行的 copy 操作会占用数据库连接
- 超时设置:对于大数据量,需要适当调整上下文超时时间
总结
通过使用 postgresql 的 copy 协议,我们成功解决了批量写入时的参数数量限制问题,同时大幅提升了性能。这种方法特别适合数据迁移、日志批量处理等需要高效写入大量数据的场景。
copy协议结合事务管理,既保证了数据一致性,又能提供了接近原生的写入性能,是postgresql批量数据操作的优选方案。
以上就是postgresql使用copy协议高效批量数据写入的实战指南的详细内容,更多关于postgresql copy批量数据写入的资料请关注代码网其它相关文章!
发表评论