前提:主要考虑控制内存占用空间,避免出现同时导出,导致主程序oom问题。
实现思路:
a.启用线程池,分页读取数据
b.使用 priorityblockingqueue 队列存储查询出来的数据,方便写入线程去
优先级队列特性 priorityblockingqueue是一个优先级队列,这意味着它里面的元素是按照某种优先级顺序进行排列的。元素的优先级是通过元素自身的自然顺序(如果元素实现了comparable接口)或者通过一个自定义的比较器(comparator)来确定的。 当从队列中获取元素时,具有最高优先级的元素会被首先返回。例如,在一个存储任务的priorityblockingqueue中,紧急任务可以被定义为具有较高优先级,这样它们就能在普通任务之前被执行。 阻塞队列特性 作为一个阻塞队列,priorityblockingqueue提供了阻塞操作。当队列为空时,试图从队列中获取元素的线程会被阻塞,直到队列中有可用的元素。 同样,当队列已满(不过priorityblockingqueue在理论上是无界的,这个情况比较特殊,后面会详细说)时,试图向队列中添加元素的线程会被阻塞,直到队列中有足够的空间。这种阻塞特性使得它在多线程环境下能够有效地协调生产者 - 消费者模式
c.开启单独的一个线程,做读取写入,利用join()方法 等待所有写入结束,直接返回。
completablefuture.runasync(() -> system.out.println("执行一个异步任务"));
d.利用 countdownlatch 做读取数据任务阻塞。
e.字典转换是用的反射。
以下是代码实现
1.配置线程池
/** * 导出线程池配置 */ @configuration @slf4j public class threadpoolexportexecutorconfig { @bean("exportserviceexecutor") @primary public executor exportserviceexecutor() { threadpooltaskexecutor executor = new threadpooltaskexecutor(); //配置核心线程数 executor.setcorepoolsize(12); //配置最大线程数 executor.setmaxpoolsize(20); //空闲时间 executor.setkeepaliveseconds(60); //配置队列大小 executor.setqueuecapacity(100); //配置线程池中的线程的名称前缀 executor.setthreadnameprefix("exportthread-"); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // caller_runs:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy()); //执行初始化 executor.initialize(); return executor; } }
2.转换工具类
public class pojoinfoutil { /** * listdto转换 * * @param <e> entity类 * @param <d> dto类 * @param listinfoe listinfoe<e>类对象 * @return list<d> 转换后list<d> */ public static <e, d> list<d> listinfotodto(list<e> listinfo, class<d> dtoclass) { if (collectionutils.isempty(listinfo)) { return lists.newarraylist(); } // 创建 modelmapper 实例 modelmapper modelmapper = new modelmapper(); // 设置匹配策略为基于字段名称 modelmapper.getconfiguration().setmatchingstrategy(matchingstrategies.strict); // 使用流和 modelmapper 进行转换 list<d> list = listinfo.stream() .map(entity -> modelmapper.map(entity, dtoclass)) .collect(collectors.tolist()); // 释放旧数据 listinfo.clear(); return list; }
3.导出工具类
@component public class multithreadexportutil { @resource(name = "exportserviceexecutor") private threadpooltaskexecutor threadpooltaskexecutor; @autowired private dicadapter dicadapter; /** * * @param exportclazz 导出实体类 * @param dictfieldnames 字典转换字段 * @param filename 导出文件名称 * @param sheetnamepre sheet页名前缀 * @param service 查询接口类 * @param querywrapper 查询条件 */ public void exprtbythread(class<?> exportclazz,list<string> dictfieldnames,string filename,string sheetnamepre,iservice service, lambdaquerywrapper<?> querywrapper){ httpservletresponse response = httpservletutil.getresponse(); map<string, string> threadexportlimit = dicadapter.getbasicinfodicmapinfo("threadexportlimit"); if(objectutils.isempty(threadexportlimit) || !threadexportlimit.containskey("limit") || !threadexportlimit.containskey("maxlimit")){ throw new bizexception("请提前配置导出数量限制"); } //每次请求限制条数 int limit = integer.parseint(threadexportlimit.get("limit")); //最大限制条数 int maxlimit = integer.parseint(threadexportlimit.get("maxlimit")); int count = service.count(querywrapper); if(count > maxlimit) throw new bizexception("导出条数超出最大条数" + maxlimit + "限制,请调整查询条件"); //设置响应头 response.setcontenttype("application/vnd.ms-excel"); response.setcharacterencoding("utf-8"); try { string name = urlencoder.encode(filename, "utf-8"); response.setheader("content-disposition", "attachment;filename=" + name + ".xlsx"); } catch (exception e) { throw new bizexception(e.getmessage()); } // 查询次数 int i = (count + limit - 1) / limit; atomicreference<excelwriter> excelwriterref = new atomicreference<>(); servletoutputstream outputstream = null; try { outputstream = response.getoutputstream(); excelwriterref.set(easyexcel.write(outputstream, exportclazz).build()); } catch (ioexception e) { throw new runtimeexception(e); } if(i == 0){ writesheet writesheet = easyexcel.writersheet(sheetnamepre).head(exportclazz) .registerwritehandler(new longestmatchcolumnwidthstylestrategy()).build(); excelwriterref.get().write(null, writesheet); excelwriterref.get().finish(); return; } //计数器 countdownlatch countdownlatch = new countdownlatch(i); // 使用 priorityblockingqueue 作为查询结果的缓冲区 blockingqueue<threadqueryresult> resultqueue = new priorityblockingqueue<threadqueryresult>(); for(int j = 0; j < i; j++){ int pageno = j + 1; string sheetname = sheetnamepre + pageno; threadpooltaskexecutor.execute(()->{ try { ipage<t> page = service.page(new page<>(pageno, limit), querywrapper); list<t> records = page.getrecords(); //转换 list<?> exportlist = pojoinfoutil.listinfotodto(records, exportclazz); //处理字典数据 convertfieldwithdictionary(exportlist,dictfieldnames); resultqueue.put(new threadqueryresult(pageno, sheetname, exportlist)); }catch (exception e){ e.printstacktrace(); logutil.info("查询第" + pageno + "页时,发生异常" + e.getmessage()); throw new bizexception("查询第" + pageno + "页时,发生异常" + e.getmessage()); } }); countdownlatch.countdown(); } // 启动一个写入线程 按照查询顺序读取 写入 completablefuture<void> writefuture = completablefuture.runasync(() -> { for (int pageno = 1; pageno <= i; ) { boolean interrupted = false; try { while (true) { threadqueryresult result = resultqueue.take(); if (result.getpageno() == pageno) { writesheet writesheet = easyexcel.writersheet(result.getsheetname()).head(exportclazz) .registerwritehandler(new longestmatchcolumnwidthstylestrategy()).build(); excelwriter excelwriter = excelwriterref.get(); excelwriter.write(result.getrecords(), writesheet); result.getrecords().clear(); //及时释放写入数据 pageno++; break; } else { // 如果不是当前需要的 pageno,则放回队列 resultqueue.put(result); } } } catch (interruptedexception e) { interrupted = true; logutil.error("写入线程被中断: " + e.getmessage()); } finally { if (interrupted) { thread.currentthread().interrupt(); // 重新设置中断标志 } } } }, threadpooltaskexecutor); // 等待所有任务完成 try { countdownlatch.await(); } catch (interruptedexception e) { e.printstacktrace(); } //等待写入线程完成 writefuture.join(); // 关闭 excelwriter excelwriter excelwriter = excelwriterref.get(); if (excelwriter != null) { excelwriter.finish(); } } /** * 对给定的对象列表中的指定字段进行字典转换。 * * @param records 要转换的对象列表 * @param fielddictnames 字段名列表(带前缀) * @param <t> 对象类型 */ public <t> void convertfieldwithdictionary(list<t> records, list<string> fielddictnames) { if (collectionutils.isempty(records) || collectionutils.isempty(fielddictnames)) { return; } map<string, map<string, string>> dictmap = dicadapter.handlestaticbasicinfodicmap(); try { for (string fielddictname : fielddictnames) { //前缀区分各实体类字段 if(stringutils.isblank(fielddictname) || !fielddictname.contains("-")){ return; } string fieldname = fielddictname.split("-")[1]; for (t record : records) { field field = record.getclass().getdeclaredfield(fieldname); field.setaccessible(true); object fieldvalue = field.get(record); if (fieldvalue != null && dictmap.containskey(fielddictname)) { map<string, string> dictvaluemap = dictmap.get(fielddictname); if (dictvaluemap != null && dictvaluemap.containskey(fieldvalue)) { string dictvalue = dictvaluemap.get(fieldvalue); if (dictvalue != null) { field.set(record, dictvalue); } } } } } } catch(exception e){ logutil.error("导出转换字典失败,请联系管理员" + e); throw new bizexception("导出转换字典失败,请联系管理员"+ e); } } }
到此这篇关于java实现亿级千万级数据顺序导出的示例代码的文章就介绍到这了,更多相关java 亿级千万级顺序导出内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论