项目场景
大数据量的list集合,需要把list集合中的数据批量插入数据库中。
解决方案
拆分list集合后,然后使用多线程批量插入数据库
1.实体类
package com.test.entity;
import lombok.data;
@data
public class testentity {
private string id;
private string name;
}
2.mapper
如果数据量不大,用foreach标签就足够了。如果数据量很大,建议使用batch模式。
package com.test.mapper;
import java.util.list;
import org.apache.ibatis.annotations.insert;
import org.apache.ibatis.annotations.param;
import com.test.entity.testentity;
public interface testmapper {
/**
* 1.用于使用batch模式,executortype.batch开启批处理模式
* 数据量很大,推荐这种方式
*/
@insert("insert into test(id, name) "
+ " values"
+ " (#{id,jdbctype=varchar}, #{name,jdbctype=varchar})")
void testinsert(testentity testentity);
/**
* 2.使用foreach标签,批量保存
* 数据量少可以使用这种方式
*/
@insert("insert into test(id, name) "
+ " values"
+ " <foreach collection='list' item='item' index='index' separator=','>"
+ " (#{item.id,jdbctype=varchar}, #{item.name,jdbctype=varchar})"
+ " </foreach>")
void testbatchinsert(@param("list") list<testentity> list);
}
3.spring容器注入线程池bean对象
package com.test.config;
import java.util.concurrent.executor;
import java.util.concurrent.threadpoolexecutor;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.scheduling.annotation.enableasync;
import org.springframework.scheduling.concurrent.threadpooltaskexecutor;
@configuration
@enableasync
public class executorconfig {
/**
* 异步任务自定义线程池
*/
@bean(name = "asyncserviceexecutor")
public executor asyncserviceexecutor() {
threadpooltaskexecutor executor = new threadpooltaskexecutor();
//配置核心线程数
executor.setcorepoolsize(50);
//配置最大线程数
executor.setmaxpoolsize(500);
//配置队列大小
executor.setqueuecapacity(300);
//配置线程池中的线程的名称前缀
executor.setthreadnameprefix("testexecutor-");
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// caller_runs:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy());
//调用shutdown()方法时等待所有的任务完成后再关闭
executor.setwaitfortaskstocompleteonshutdown(true);
//等待所有任务完成后的最大等待时间
executor.setawaitterminationseconds(60);
return executor;
}
}
4.创建异步线程业务类
package com.test.service;
import java.util.list;
import java.util.concurrent.countdownlatch;
import org.apache.ibatis.session.executortype;
import org.apache.ibatis.session.sqlsession;
import org.apache.ibatis.session.sqlsessionfactory;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.scheduling.annotation.async;
import org.springframework.stereotype.service;
import com.test.entity.testentity;
import com.test.mapper.testmapper;
@service
public class asyncservice {
@autowired
private sqlsessionfactory sqlsessionfactory;
@async("asyncserviceexecutor")
public void executeasync(list<string> logoutputresults, countdownlatch countdownlatch) {
//获取session,打开批处理,因为是多线程,所以每个线程都要开启一个事务
sqlsession session = sqlsessionfactory.opensession(executortype.batch);
try{
testmapper mapper = session.getmapper(testmapper.class);
//异步线程要做的事情
for (int i = 0; i < logoutputresults.size(); i++) {
system.out.println(thread.currentthread().getname() + "线程:" + logoutputresults.get(i));
testentity test = new testentity();
//test.set()
//.............
//批量保存
mapper.testinsert(test);
//每1000条提交一次防止内存溢出
if(i%1000==0){
session.flushstatements();
}
}
//提交剩下未处理的事务
session.flushstatements();
}finally {
countdownlatch.countdown();// 很关键, 无论上面程序是否异常必须执行countdown,否则await无法释放
if(session != null){
session.close();
}
}
}
}
5.拆分list调用异步的业务方法
package com.test.service;
import java.util.arraylist;
import java.util.list;
import java.util.concurrent.countdownlatch;
import javax.annotation.resource;
import org.springframework.stereotype.service;
@service
public class testservice {
@resource
private asyncservice asyncservice;
public int testmultithread() {
list<string> logoutputresults = gettestdata();
//按线程数拆分后的list
list<list<string>> lists = splitlist(logoutputresults);
countdownlatch countdownlatch = new countdownlatch(lists.size());
for (list<string> listsub:lists) {
asyncservice.executeasync(listsub, countdownlatch);
}
try {
countdownlatch.await(); //保证之前的所有的线程都执行完成,才会走下面的;
// 这样就可以在下面拿到所有线程执行完的集合结果
} catch (exception e) {
e.printstacktrace();
}
return logoutputresults.size();
}
public list<string> gettestdata() {
list<string> logoutputresults = new arraylist<string>();
for (int i = 0; i < 3000; i++) {
logoutputresults.add("测试数据"+i);
}
return logoutputresults;
}
public list<list<string>> splitlist(list<string> logoutputresults) {
list<list<string>> results = new arraylist<list<string>>();
/*动态线程数方式*/
// 每500条数据开启一条线程
int threadsize = 500;
// 总数据条数
int datasize = logoutputresults.size();
// 线程数,动态生成
int threadnum = datasize / threadsize + 1;
/*固定线程数方式
// 线程数
int threadnum = 6;
// 总数据条数
int datasize = logoutputresults.size();
// 每一条线程处理多少条数据
int threadsize = datasize / (threadnum - 1);
*/
// 定义标记,过滤threadnum为整数
boolean special = datasize % threadsize == 0;
list<string> cutlist = null;
// 确定每条线程的数据
for (int i = 0; i < threadnum; i++) {
if (i == threadnum - 1) {
if (special) {
break;
}
cutlist = logoutputresults.sublist(threadsize * i, datasize);
} else {
cutlist = logoutputresults.sublist(threadsize * i, threadsize * (i + 1));
}
results.add(cutlist);
}
return results;
}
}6.controller测试
@restcontroller
public class testcontroller {
@resource
private testservice testservice;
@requestmapping(value = "/log", method = requestmethod.get)
@apioperation(value = "测试")
public string test() {
testservice.testmultithread();
return "success";
}
}总结
注意这里执行插入的数据是无序的。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论