一、无返回值任务函数
// 数据分批
list<list<statisticsdto>> batches = lists.partition(statisticslist, batch_size);
list<completablefuture<void>> futures = new arraylist<>(batches.size());
// 数据处理
for (int i = 0; i < batches.size(); i++) {
logger.info("批次 " + i + " 开始处理...");
string logid = logidthreadlocal.getlogid(); // 传递主线程的 logid
list<statisticsdto> batchdata = batches.get(i);
completablefuture<void> future = completablefuture.runasync(() -> {
try {
logidthreadlocal.setlogid(logid);
processbatch(batchdata);
} finally {
logidthreadlocal.clean();
}
});
futures.add(future);
}
// 等待所有的异步任务完成
completablefuture<void> allof = completablefuture.allof(futures.toarray(new completablefuture[0]));
allof.join();
二、带返回值任务函数
// 数据分批
list<list<statisticsdto>> batches = lists.partition(statisticslist, batch_size);
list<completablefuture<list<statisticsdto>>> futures = new arraylist<>(batches.size());
// 数据处理
for (int i = 0; i < batches.size(); i++) {
logger.info("批次 " + i + " 开始处理...");
string logid = logidthreadlocal.getlogid(); // 传递主线程的 logid
list<statisticsdto> batchdata = batches.get(i);
completablefuture<list<doctoravataranalysisdto>> future = completablefuture.supplyasync(() -> {
try {
logidthreadlocal.setlogid(logid);
return processbatch(batchdata);
} finally {
logidthreadlocal.clean();
}
});
futures.add(future);
}
// 等待所有 cf 完成并合并结果
completablefuture<void> allof = completablefuture.allof(futures.toarray(new completablefuture[0]));
list<statisticsdto> result = allof.thenapply(
v -> futures.stream().map(completablefuture::join).flatmap(list::stream).collect(collectors.tolist())
).join();
到此这篇关于java使用completablefuture分批处理任务实现的文章就介绍到这了,更多相关java completablefuture分批处理内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论