前言
开发目的:
提高百万级数据插入效率。
采取方案:
利用threadpooltaskexecutor多线程批量插入。
采用技术:
- springboot2.1.1
- mybatisplus3.0.6
- swagger2.5.0
- lombok1.18.4
- postgresql
- threadpooltaskexecutor
具体实现细节
application-dev.properties添加线程池配置信息
# 异步线程配置 # 配置核心线程数 async.executor.thread.core_pool_size = 30 # 配置最大线程数 async.executor.thread.max_pool_size = 30 # 配置队列大小 async.executor.thread.queue_capacity = 99988 # 配置线程池中的线程的名称前缀 async.executor.thread.name.prefix = async-importdb-
spring容器注入线程池bean对象
@configuration
@enableasync
@slf4j
public class executorconfig {
@value("${async.executor.thread.core_pool_size}")
private int corepoolsize;
@value("${async.executor.thread.max_pool_size}")
private int maxpoolsize;
@value("${async.executor.thread.queue_capacity}")
private int queuecapacity;
@value("${async.executor.thread.name.prefix}")
private string nameprefix;
@bean(name = "asyncserviceexecutor")
public executor asyncserviceexecutor() {
log.warn("start asyncserviceexecutor");
//在这里修改
threadpooltaskexecutor executor = new visiablethreadpooltaskexecutor();
//配置核心线程数
executor.setcorepoolsize(corepoolsize);
//配置最大线程数
executor.setmaxpoolsize(maxpoolsize);
//配置队列大小
executor.setqueuecapacity(queuecapacity);
//配置线程池中的线程的名称前缀
executor.setthreadnameprefix(nameprefix);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// caller_runs:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy());
//执行初始化
executor.initialize();
return executor;
}
}创建异步线程 业务类
@service
@slf4j
public class asyncserviceimpl implements asyncservice {
@override
@async("asyncserviceexecutor")
public void executeasync(list<logoutputresult> logoutputresults, logoutputresultmapper logoutputresultmapper, countdownlatch countdownlatch) {
try{
log.warn("start executeasync");
//异步线程要做的事情
logoutputresultmapper.addlogoutputresultbatch(logoutputresults);
log.warn("end executeasync");
}finally {
countdownlatch.countdown();// 很关键, 无论上面程序是否异常必须执行countdown,否则await无法释放
}
}
}创建多线程批量插入具体业务方法
@override
public int testmultithread() {
list<logoutputresult> logoutputresults = gettestdata();
//测试每100条数据插入开一个线程
list<list<logoutputresult>> lists = converthandler.splitlist(logoutputresults, 100);
countdownlatch countdownlatch = new countdownlatch(lists.size());
for (list<logoutputresult> listsub:lists) {
asyncservice.executeasync(listsub, logoutputresultmapper,countdownlatch);
}
try {
countdownlatch.await(); //保证之前的所有的线程都执行完成,才会走下面的;
// 这样就可以在下面拿到所有线程执行完的集合结果
} catch (exception e) {
log.error("阻塞异常:"+e.getmessage());
}
return logoutputresults.size();
}模拟2000003 条数据进行测试
对了,最近我整理了上百本电子书/软件/视频以及面试题,还在持续更新中,全部免费,文档地址:
https://r86oxhhvu2.feishu.cn/wiki/zmq0wjeffirrbvk9nefcf7uknsj

多线程 测试 2000003 耗时如下:耗时1.67分钟


本次开启30个线程,截图如下:

单线程测试2000003 耗时如下:耗时5.75分钟


检查多线程入库的数据,检查是否存在重复入库的问题:
根据id分组,查看是否有id重复的数据,通过sql语句检查,没有发现重复入库的问题

检查数据完整性:
通过sql语句查询,多线程录入数据完整

测试结果
不同线程数测试:


总结
通过以上测试案列,同样是导入2000003 条数据,多线程耗时1.67分钟,单线程耗时5.75分钟。通过对不同线程数的测试,发现不是线程数越多越好,具体多少合适,网上有一个不成文的算法:
cpu核心数量*2 +2 个线程。
以上就是springboot利用threadpooltaskexecutor批量插入百万级数据的具体实现的详细内容,更多关于springboot threadpooltaskexecutor插入数据的资料请关注代码网其它相关文章!
发表评论