当前位置: 代码网 > it编程>编程语言>Java > SpringBoot分段处理List集合多线程批量插入数据方式

SpringBoot分段处理List集合多线程批量插入数据方式

2025年09月15日 Java 我要评论
项目场景大数据量的list集合,需要把list集合中的数据批量插入数据库中。解决方案拆分list集合后,然后使用多线程批量插入数据库1.实体类package com.test.entity;impor

项目场景

大数据量的list集合,需要把list集合中的数据批量插入数据库中。

解决方案

拆分list集合后,然后使用多线程批量插入数据库

1.实体类

package com.test.entity;

import lombok.data;

@data
public class testentity {
	
	private string id;
	private string name;
}

2.mapper

如果数据量不大,用foreach标签就足够了。如果数据量很大,建议使用batch模式。

package com.test.mapper;

import java.util.list;

import org.apache.ibatis.annotations.insert;
import org.apache.ibatis.annotations.param;

import com.test.entity.testentity;

public interface testmapper {
	
	/**
	  * 1.用于使用batch模式,executortype.batch开启批处理模式
	  * 数据量很大,推荐这种方式
	  */
	@insert("insert into test(id, name) "
			   + " values"
			   + " (#{id,jdbctype=varchar}, #{name,jdbctype=varchar})")
	void testinsert(testentity testentity);
	
	/**
	  * 2.使用foreach标签,批量保存
	  * 数据量少可以使用这种方式
	  */
	@insert("insert into test(id, name) "
			   + " values"
			   + " <foreach collection='list' item='item' index='index' separator=','>"
			   + " (#{item.id,jdbctype=varchar}, #{item.name,jdbctype=varchar})"
			   + " </foreach>")
	void testbatchinsert(@param("list") list<testentity> list);
}

3.spring容器注入线程池bean对象

package com.test.config;

import java.util.concurrent.executor;
import java.util.concurrent.threadpoolexecutor;

import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.scheduling.annotation.enableasync;
import org.springframework.scheduling.concurrent.threadpooltaskexecutor;

@configuration
@enableasync
public class executorconfig {
    /**
     * 异步任务自定义线程池
     */
    @bean(name = "asyncserviceexecutor")
    public executor asyncserviceexecutor() {
    	threadpooltaskexecutor executor = new threadpooltaskexecutor();
        //配置核心线程数
        executor.setcorepoolsize(50);
        //配置最大线程数
        executor.setmaxpoolsize(500);
        //配置队列大小
        executor.setqueuecapacity(300);
        //配置线程池中的线程的名称前缀
        executor.setthreadnameprefix("testexecutor-");
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // caller_runs:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy());
        //调用shutdown()方法时等待所有的任务完成后再关闭
        executor.setwaitfortaskstocompleteonshutdown(true);
        //等待所有任务完成后的最大等待时间
		executor.setawaitterminationseconds(60);
        return executor;
    }
}

4.创建异步线程业务类

package com.test.service;

import java.util.list;
import java.util.concurrent.countdownlatch;

import org.apache.ibatis.session.executortype;
import org.apache.ibatis.session.sqlsession;
import org.apache.ibatis.session.sqlsessionfactory;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.scheduling.annotation.async;
import org.springframework.stereotype.service;

import com.test.entity.testentity;
import com.test.mapper.testmapper;

@service
public class asyncservice {
	@autowired
	private sqlsessionfactory sqlsessionfactory;
	
	@async("asyncserviceexecutor")
    public void executeasync(list<string> logoutputresults, countdownlatch countdownlatch) {
		//获取session,打开批处理,因为是多线程,所以每个线程都要开启一个事务
        sqlsession session = sqlsessionfactory.opensession(executortype.batch);
		
        try{
        	
        	testmapper mapper = session.getmapper(testmapper.class);
        	
            //异步线程要做的事情
        	for (int i = 0; i < logoutputresults.size(); i++) {
    			system.out.println(thread.currentthread().getname() + "线程:" + logoutputresults.get(i));
    			
    			testentity test = new testentity();
    			//test.set()
    			//.............
    			//批量保存
    			mapper.testinsert(test);
    			//每1000条提交一次防止内存溢出
    			if(i%1000==0){
    				session.flushstatements();
    			}
			}
        	//提交剩下未处理的事务
    		session.flushstatements();
        }finally {
            countdownlatch.countdown();// 很关键, 无论上面程序是否异常必须执行countdown,否则await无法释放
			if(session != null){
				session.close();
			}
        }
    }
}

5.拆分list调用异步的业务方法

package com.test.service;

import java.util.arraylist;
import java.util.list;
import java.util.concurrent.countdownlatch;

import javax.annotation.resource;

import org.springframework.stereotype.service;


@service
public class testservice {

	@resource
	private asyncservice asyncservice;
	
	public int testmultithread() {
        list<string> logoutputresults = gettestdata();
        //按线程数拆分后的list
        list<list<string>> lists = splitlist(logoutputresults);
        countdownlatch countdownlatch = new countdownlatch(lists.size());
        for (list<string> listsub:lists) {
            asyncservice.executeasync(listsub, countdownlatch);
        }
        try {
            countdownlatch.await(); //保证之前的所有的线程都执行完成,才会走下面的;
            // 这样就可以在下面拿到所有线程执行完的集合结果
        } catch (exception e) {
            e.printstacktrace();
        }
        return logoutputresults.size();
    }
	
	public list<string> gettestdata() {
		list<string> logoutputresults = new arraylist<string>();
        for (int i = 0; i < 3000; i++) {
        	logoutputresults.add("测试数据"+i);
		}
        return logoutputresults;
    }
	
	public list<list<string>> splitlist(list<string> logoutputresults) {
		list<list<string>> results = new arraylist<list<string>>();
		
		/*动态线程数方式*/
		// 每500条数据开启一条线程
		int threadsize = 500;
		// 总数据条数
		int datasize = logoutputresults.size();
		// 线程数,动态生成
		int threadnum = datasize / threadsize + 1;
	 
	    /*固定线程数方式
		    // 线程数
		    int threadnum = 6;
		    // 总数据条数
		    int datasize = logoutputresults.size();
		    // 每一条线程处理多少条数据
		    int threadsize = datasize / (threadnum - 1);
	    */
	 
		// 定义标记,过滤threadnum为整数
		boolean special = datasize % threadsize == 0;
	 
		list<string> cutlist = null;
	 
		// 确定每条线程的数据
		for (int i = 0; i < threadnum; i++) {
			if (i == threadnum - 1) {
				if (special) {
					break;
				}
				cutlist = logoutputresults.sublist(threadsize * i, datasize);
			} else {
				cutlist = logoutputresults.sublist(threadsize * i, threadsize * (i + 1));
			}
			
			results.add(cutlist);
		}
		
        return results;
    }
}

6.controller测试

@restcontroller
public class testcontroller {
	
	@resource
	private testservice testservice;
	

	@requestmapping(value = "/log", method = requestmethod.get)
	@apioperation(value = "测试")
	public string test() {
		testservice.testmultithread();
		return "success";
	}
}

总结

注意这里执行插入的数据是无序的。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com