当前位置: 代码网 > it编程>数据库>MsSqlserver > PostgreSQL使用COPY协议高效批量数据写入的实战指南

PostgreSQL使用COPY协议高效批量数据写入的实战指南

2025年11月20日 MsSqlserver 我要评论
问题背景在开发过程中,我们经常会遇到需要批量写入大量数据到 postgresql 数据库的场景。当使用传统的参数化插入语句时,可能会遇到如下错误:pq: got 86575 parameters bu

问题背景

在开发过程中,我们经常会遇到需要批量写入大量数据到 postgresql 数据库的场景。当使用传统的参数化插入语句时,可能会遇到如下错误:

pq: got 86575 parameters but postgresql only supports 65535 parameters

这是因为 postgresql 对单个查询的参数数量有限制(通常为 65535)。传统的解决方案是进行数据分片,分批写入数据库。但这种方法存在以下问题:

  • 需要手动管理分片逻辑
  • 多次数据库往返,网络开销大
  • 事务管理复杂
  • 性能不够理想

copy 协议解决方案

copy 协议简介

postgresql 的 copy 协议是专门为高效批量数据操作设计的二进制协议,具有以下优势:

  1. 高性能:避免了 sql 解析开销,直接使用二进制格式传输数据
  2. 低内存占用:流式处理,不需要在内存中构建庞大的 sql 语句
  3. 事务安全:可以在事务中执行,保证数据一致性
  4. 无参数限制:不受 postgresql 参数数量限制

二进制协议原理

copy 协议使用 postgresql 的前后端协议进行数据传输,其工作流程如下:

  1. 启动 copy 模式:客户端发送 copy from stdin 命令
  2. 数据传输:使用二进制格式按行发送数据
  3. 结束传输:发送特定的结束标记
  4. 确认完成:服务器返回处理结果

二进制格式避免了文本解析的开销,直接使用网络字节序传输数据,大大提高了传输效率。

实战实现

依赖库

import (
    "github.com/lib/pq"
    "gorm.io/gorm"
)

核心实现代码

// batchcreate 批量创建消息接收者记录 - 使用 copy 协议
func (r *receiverrepo) batchcreate(ctx context.context, db *gorm.db, data []*define.wecommsgreceiver) (rowsaffected int64, err error) {
    db = r.withtrace(ctx, db)
    db = db.table(r.tablename())

    if len(data) == 0 {
        return 0, nil
    }

    // 过滤掉 nil 的数据
    validdata := make([]*define.wecommsgreceiver, 0, len(data))
    for _, item := range data {
        if item != nil {
            validdata = append(validdata, item)
        }
    }
    if len(validdata) == 0 {
        return 0, nil
    }

    // 获取底层 sql.db
    sqldb := db.db()
    
    // 开始事务
    tx, err := sqldb.begintx(ctx, nil)
    if err != nil {
        return 0, fmt.errorf("开始事务失败:%+v", err)
    }
    defer func() {
        if err != nil {
            tx.rollback()
        }
    }()

    // 创建 copy writer
    stmt, err := tx.prepare(pq.copyin(r.tablename(), "send_log_id", "user_id", "status", "created_at", "updated_at"))
    if err != nil {
        return 0, fmt.errorf("准备 copy 语句失败:%+v", err)
    }
    defer stmt.close()

    // 批量写入数据
    for _, item := range validdata {
        _, err = stmt.exec(item.sendlogid, item.userid, item.status, item.createdat, item.updatedat)
        if err != nil {
            return 0, fmt.errorf("写入数据失败:%+v", err)
        }
    }

    // 执行 copy
    _, err := stmt.exec()
    if err != nil {
        return 0, fmt.errorf("执行 copy 失败:%+v", err)
    }

    // 提交事务
    if err = tx.commit(); err != nil {
        return 0, fmt.errorf("提交事务失败:%+v", err)
    }
    
    rowsaffected = int64(len(validdata))
    return rowsaffected, nil
}

代码说明

  1. 数据验证:首先过滤掉 nil 数据,确保数据有效性 
  2. 事务管理:使用事务确保数据一致性,出错时自动回滚
  3. copy 准备:通过 pq.copyin 准备 copy 语句,指定表名和列名
  4. 批量写入:遍历数据并执行 exec,但此时数据还在客户端缓冲区
  5. 最终执行:调用 stmt.exec() 真正将数据发送到服务器
  6. 事务提交:提交事务,完成批量写入

完整测试用例

// 设置测试数据库
func setuptestdb() (*gorm.db, error) {
    ctx := context.background()
    postgres, err := infrastructure.dialpostgres(ctx, infrastructure.postgresconfig{
        host:     "host",
        port:     5432,
        username: "postgres",
        password: "xxxxx",
        database: "xxxxx",
    })
    if err != nil {
        return nil, err
    }

    return postgres, nil
}

func setuplogger() factory.logfactory {
    logger, _ := factory.newjsonfactory(factory.newlevel("info"), factory.newzapoption(factory.addcallerskip(0)))
    return logger
}

func testreceiverrepo_batchcreate(t *testing.t) {
    db, err := setuptestdb()
    require.noerror(t, err)
    defer db.close()

    // 创建日志工厂
    logger := setuplogger()

    // 创建 repository 实例
    repo := newreceiverrepository(db, logger)

    // 准备测试数据 - 20000 条记录,使用负的 send_log_id 避免污染数据
    testdata := make([]*define.wecommsgreceiver, 0, 20000)
    now := time.now()
    negativesendlogid := int64(-100000) // 使用负的 send_log_id

    for i := 0; i < 20000; i++ {
        testdata = append(testdata, &define.wecommsgreceiver{
            sendlogid: negativesendlogid,
            userid:    "test_user_" + fmt.sprint(i),
            status:    1,
            createdat: now,
            updatedat: now,
        })
    }

    ctx := context.background()

    // 执行批量插入
    rowsaffected, err := repo.batchcreate(ctx, db, testdata)

    // 验证结果
    assert.noerror(t, err)
    assert.equal(t, int64(20000), rowsaffected)

    // 验证数据是否正确插入
    var count int64
    query := "select count(*) from wecom_msg_receiver where send_log_id < 0 and send_log_id >= ?"
    err = db.raw(query, negativesendlogid).count(&count).error
    assert.noerror(t, err)
    assert.equal(t, int64(20000), count)

    // 清理测试数据
    deletequery := "delete from wecom_msg_receiver where send_log_id < 0 and send_log_id >= ?"
    result := db.exec(deletequery, negativesendlogid)
    assert.noerror(t, result.error)
    assert.equal(t, int64(20000), result.rowsaffected)

    // 验证清理是否成功
    err = db.raw(query, negativesendlogid).count(&count).error
    assert.noerror(t, err)
    assert.equal(t, int64(0), count)
}

性能对比

在实际测试中,copy 协议相比传统分批插入有显著性能提升:

方案20000 条数据耗时内存占用网络请求次数
传统分批插入~15 秒多次
copy 协议~2 秒1 次

注意事项

  1. 错误处理:copy 协议中某行数据错误可能导致整个批量操作失败
  2. 数据类型:确保 go 数据类型与 postgresql 列类型匹配
  3. 连接池:长时间运行的 copy 操作会占用数据库连接
  4. 超时设置:对于大数据量,需要适当调整上下文超时时间

总结

通过使用 postgresql 的 copy 协议,我们成功解决了批量写入时的参数数量限制问题,同时大幅提升了性能。这种方法特别适合数据迁移、日志批量处理等需要高效写入大量数据的场景。

copy协议结合事务管理,既保证了数据一致性,又能提供了接近原生的写入性能,是postgresql批量数据操作的优选方案。

以上就是postgresql使用copy协议高效批量数据写入的实战指南的详细内容,更多关于postgresql copy批量数据写入的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com