项目场景
大数据量的list集合,需要把list集合中的数据批量插入数据库中。
解决方案
拆分list集合后,然后使用多线程批量插入数据库
1.实体类
package com.test.entity; import lombok.data; @data public class testentity { private string id; private string name; }
2.mapper
如果数据量不大,用foreach标签就足够了。如果数据量很大,建议使用batch模式。
package com.test.mapper; import java.util.list; import org.apache.ibatis.annotations.insert; import org.apache.ibatis.annotations.param; import com.test.entity.testentity; public interface testmapper { /** * 1.用于使用batch模式,executortype.batch开启批处理模式 * 数据量很大,推荐这种方式 */ @insert("insert into test(id, name) " + " values" + " (#{id,jdbctype=varchar}, #{name,jdbctype=varchar})") void testinsert(testentity testentity); /** * 2.使用foreach标签,批量保存 * 数据量少可以使用这种方式 */ @insert("insert into test(id, name) " + " values" + " <foreach collection='list' item='item' index='index' separator=','>" + " (#{item.id,jdbctype=varchar}, #{item.name,jdbctype=varchar})" + " </foreach>") void testbatchinsert(@param("list") list<testentity> list); }
3.spring容器注入线程池bean对象
package com.test.config; import java.util.concurrent.executor; import java.util.concurrent.threadpoolexecutor; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.scheduling.annotation.enableasync; import org.springframework.scheduling.concurrent.threadpooltaskexecutor; @configuration @enableasync public class executorconfig { /** * 异步任务自定义线程池 */ @bean(name = "asyncserviceexecutor") public executor asyncserviceexecutor() { threadpooltaskexecutor executor = new threadpooltaskexecutor(); //配置核心线程数 executor.setcorepoolsize(50); //配置最大线程数 executor.setmaxpoolsize(500); //配置队列大小 executor.setqueuecapacity(300); //配置线程池中的线程的名称前缀 executor.setthreadnameprefix("testexecutor-"); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // caller_runs:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy()); //调用shutdown()方法时等待所有的任务完成后再关闭 executor.setwaitfortaskstocompleteonshutdown(true); //等待所有任务完成后的最大等待时间 executor.setawaitterminationseconds(60); return executor; } }
4.创建异步线程业务类
package com.test.service; import java.util.list; import java.util.concurrent.countdownlatch; import org.apache.ibatis.session.executortype; import org.apache.ibatis.session.sqlsession; import org.apache.ibatis.session.sqlsessionfactory; import org.springframework.beans.factory.annotation.autowired; import org.springframework.scheduling.annotation.async; import org.springframework.stereotype.service; import com.test.entity.testentity; import com.test.mapper.testmapper; @service public class asyncservice { @autowired private sqlsessionfactory sqlsessionfactory; @async("asyncserviceexecutor") public void executeasync(list<string> logoutputresults, countdownlatch countdownlatch) { //获取session,打开批处理,因为是多线程,所以每个线程都要开启一个事务 sqlsession session = sqlsessionfactory.opensession(executortype.batch); try{ testmapper mapper = session.getmapper(testmapper.class); //异步线程要做的事情 for (int i = 0; i < logoutputresults.size(); i++) { system.out.println(thread.currentthread().getname() + "线程:" + logoutputresults.get(i)); testentity test = new testentity(); //test.set() //............. //批量保存 mapper.testinsert(test); //每1000条提交一次防止内存溢出 if(i%1000==0){ session.flushstatements(); } } //提交剩下未处理的事务 session.flushstatements(); }finally { countdownlatch.countdown();// 很关键, 无论上面程序是否异常必须执行countdown,否则await无法释放 if(session != null){ session.close(); } } } }
5.拆分list调用异步的业务方法
package com.test.service; import java.util.arraylist; import java.util.list; import java.util.concurrent.countdownlatch; import javax.annotation.resource; import org.springframework.stereotype.service; @service public class testservice { @resource private asyncservice asyncservice; public int testmultithread() { list<string> logoutputresults = gettestdata(); //按线程数拆分后的list list<list<string>> lists = splitlist(logoutputresults); countdownlatch countdownlatch = new countdownlatch(lists.size()); for (list<string> listsub:lists) { asyncservice.executeasync(listsub, countdownlatch); } try { countdownlatch.await(); //保证之前的所有的线程都执行完成,才会走下面的; // 这样就可以在下面拿到所有线程执行完的集合结果 } catch (exception e) { e.printstacktrace(); } return logoutputresults.size(); } public list<string> gettestdata() { list<string> logoutputresults = new arraylist<string>(); for (int i = 0; i < 3000; i++) { logoutputresults.add("测试数据"+i); } return logoutputresults; } public list<list<string>> splitlist(list<string> logoutputresults) { list<list<string>> results = new arraylist<list<string>>(); /*动态线程数方式*/ // 每500条数据开启一条线程 int threadsize = 500; // 总数据条数 int datasize = logoutputresults.size(); // 线程数,动态生成 int threadnum = datasize / threadsize + 1; /*固定线程数方式 // 线程数 int threadnum = 6; // 总数据条数 int datasize = logoutputresults.size(); // 每一条线程处理多少条数据 int threadsize = datasize / (threadnum - 1); */ // 定义标记,过滤threadnum为整数 boolean special = datasize % threadsize == 0; list<string> cutlist = null; // 确定每条线程的数据 for (int i = 0; i < threadnum; i++) { if (i == threadnum - 1) { if (special) { break; } cutlist = logoutputresults.sublist(threadsize * i, datasize); } else { cutlist = logoutputresults.sublist(threadsize * i, threadsize * (i + 1)); } results.add(cutlist); } return results; } }
6.controller测试
@restcontroller public class testcontroller { @resource private testservice testservice; @requestmapping(value = "/log", method = requestmethod.get) @apioperation(value = "测试") public string test() { testservice.testmultithread(); return "success"; } }
总结
注意这里执行插入的数据是无序的。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论