当前位置: 代码网 > it编程>编程语言>Java > Java实现Excel通用异步导出框架方式

Java实现Excel通用异步导出框架方式

2025年10月23日 Java 我要评论
请注意!请注意!请注意!(代码中很多实体类需要自己创建哈 比如前端提交的参数 比如数据库实体类,当前文章是面向一定开发经验的选手,cv是没用的)1、审题1.小于5w直接导出,大于5w则需要创建任务2.

请注意!请注意!请注意!(代码中很多实体类需要自己创建哈 比如前端提交的参数 比如数据库实体类,当前文章是面向一定开发经验的选手,cv是没用的)

1、审题

1.小于5w直接导出,大于5w则需要创建任务

2.异步导出

3.框架

分析1

小于5w直接导出,所以我们框架需要从使用者手里知道 本次导出的数据总量是多少

分析2

既然是异步导出,所以我们不能让主线程去执行导出操作,所以主线程只管将导出任务的相关信息交给其他线程即可

分析3

既然是框架,那么我们不能关心具体的导出实现逻辑,例如怎么获取表的数据,怎么查询总量,但是我们需要关心一些通用的逻辑,例如创建文件,使用导出组件进行导出,等等

好了,现在开始实现。

2、创建任务表,记录任务的状态,进度,以及任务的类型

-- public.export_task definition

-- drop table

-- drop table public.export_task;

create table public.export_task (
	id text not null,
	export_key text not null,  -- 任务类型
	params text not null, -- 任务所需要的参数
	status int2 not null, -- 任务状态
	progress text not null, -- 任务进度
	create_user text not null, -- 创建人
	gmt_create text not null, -- 创建时间
	single_export_num int4 not null default 50000, -- 单次导出条数
	file_name text not null, -- 导出的文件名称
	file_path text null, -- 导出的文件路径
	complete_time text null, -- 导出完成时间
	constraint export_task_pkey primary key (id) -- 主键
);

-- 索引
create index export_task_export_key_index on public.export_task (export_key text_ops);

3、有了任务表后,就可以开始考虑怎么执行任务了

我的思路是:新增任务 -> 必要校验完成 -> 通过任务类型(也就是上表中的export_key)找到需要处理的处理类 -> 调用获取总量的方法,判断总量是否大于5w

  • 总量大于5w -> 将当前任务插入数据库 -> 然后将当前任务丢给队列,等待被消费处理
  • 总量小于5w -> 调用处理类的导出方法直接进行导出

上代码:controller类,新增任务(公司代码里面有记录日志的逻辑你们不能复用)

 @post
@path("/task-manage")
@produces({ mediatype.application_json })
@apioperation(value = "新建导出任务", notes = "新建导出任务", httpmethod = "post", tags = {"新建导出任务"})
public void taskcreate(taskinfodto condition, @context httpservletresponse response) throws exception {
servicebaseinfobean infobean = new servicebaseinfobean(userthreadlocal.getusername(),
userthreadlocal.getremotehost(),userthreadlocal.getlanguageoption());

exporttaskmgr.operation operation = exporttaskmgr.operation.direct_export;
exporttaskmgr.initoperlogbean(infobean, operation);
string detailzh = string.format(operation.getdetailzh(), condition.getfilename());
string detailen = string.format(operation.getdetailen(), condition.getfilename());
try{
condition.checkfilename();
long alldatatotal = taskcommandapplicationservice.exportorcreatetask(condition, response);
if(alldatatotal > globalconstants.export_default_count){
operation = exporttaskmgr.operation.create_export_task;
exporttaskmgr.initoperlogbean(infobean, operation);
}
detailzh = string.format(operation.getdetailzh(), condition.getfilename());
detailen = string.format(operation.getdetailen(), condition.getfilename());
exporttaskmgr.refreshsuccessdetail(infobean,operation, detailzh,detailen);
}
catch (exception e){
responseutils.resetcontenttype(response);
exporttaskmgr.refreshfaildetail(infobean,operation, detailzh,detailen,e);
throw e;
}
finally {
// 发送操作记录日志
string operlogmsg = json.tojsonstring(infobean.getoperlogbean());
msgsenderservice.sendmsgasync(operlogbean.imop_log_manage_topic, operlogmsg);
}
}

service: 通过当前任务类型 调用工厂找到处理类 然后... 看注释把

 @override
public long exportorcreatetask(taskinfodto condition, httpservletresponse response) throws exception {
bifunction<exporttaskpo, map<string, future<?>>, exporthandler> handlerfuc = exporthandlerfactory
.gethandlerbyexportkey(condition.getexportkey());
if(null == handlerfuc){
throw new monitorexception(-1, i18nconstants.unknown_export_type);
}

// 新建任务相关信息 暂不入库
exporttaskpo taskpo = new exporttaskpo();
taskpo.setid(uuid.randomuuid().tostring());
taskpo.setprogress(globalconstants.default_value);
taskpo.setstatus(exportstatusenums.init.getstatus());
taskpo.setcreateuser(userthreadlocal.getusername());
taskpo.setfilename(condition.getfilename());
taskpo.setexportkey(condition.getexportkey());
taskpo.setparams(condition.getparams());
taskpo.setgmtcreate(datetimeservice.getcurrenttime());
taskpo.setsingleexportnum(globalconstants.export_default_count);
taskpo.setfilepath(globalconstants.default_value);
taskpo.setcompletetime(globalconstants.default_value);

// 构建handler
exporthandler handler = handlerfuc.apply(taskpo, new hashmap<>());

// 获取total 如果大于单次导出条数则创建任务 然后后台异步导出
long total = handler.gettotal();
if(total > taskpo.getsingleexportnum()){
exporttaskrepository.save(taskpo);
exporthandlerfactory.addtasktoque(new arraylist<>(collections.singletonlist(taskpo)));
}
// 满足单次导出 则直接导出
else {
handler.directexport(response);
}

return total;
}

工厂类的具体实现:

import lombok.extern.slf4j.slf4j;
import org.apache.commons.collections4.collectionutils;
import org.springframework.stereotype.component;

import javax.annotation.postconstruct;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.bifunction;

@slf4j
@component
public class exporthandlerfactory {
private static final exporttaskrepository export_task_repository = springutil.getbean(exporttaskrepository.class);
/**
 * 线程池  3个线程消息导出任务
 */
private static final executorservice thread_pool = executors.newfixedthreadpool(3);

/**
 * 记录执行中的线程
 */
private static final map<string, future<?>> running_thread = new concurrenthashmap<>();

/**
 * 任务队列 轮训消费此队列
 */
private static final queue<exporttaskpo> task_que = new concurrentlinkedqueue<>();

/**
 * 定义处理器 例如demo类型导出任务由demo_handler进行处理
 */
private static final map<string, bifunction<exporttaskpo,map<string, future<?>>,exporthandler>> handler_map = new concurrenthashmap<>();
static {
handler_map.put(exporttypeenums.demo_export.getexportkey(), demoexporthandler::new);
}

@postconstruct
@suppresswarnings("infiniteloopstatement")
public static void init() {
runnable runnable = () -> {
while (true) {
try {
// 每隔50ms拉取一次导出任务 如果已经消费完 则不再执行
timeunit.milliseconds.sleep(50l);
exporttaskpo taskinfo = task_que.poll();
if(taskinfo == null) {
continue;
}

// 根据导出类型key 找到handler 开始处理
bifunction<exporttaskpo,map<string, future<?>>,exporthandler> bifunction =
handler_map.get(taskinfo.getexportkey());
log.info("begin consume taskinfo={},bifunction={}",taskinfo,bifunction);

if(bifunction != null) {
// 设置任务信息 以及记录运行状态的map 得到处理者
exporthandler exporthandler = bifunction.apply(taskinfo, running_thread);
log.info("begin consume exporthandler={}",exporthandler);
// 执行任务
future<boolean> exceutefuture = thread_pool.submit(exporthandler);
// 将正在执行的任务放入map中
running_thread.put(taskinfo.getid(),exceutefuture);
}
}
catch (exception e) {
log.error("consume export task error.",e);
}
}
};

executors.newfixedthreadpool(1).execute(runnable);
}

public static void stoptaskbytaskid(string taskid){
// 从运行中的map 拿到执行中的线程
future<?> future = running_thread.get(taskid);

// 如果存在  直接停止运行
if(future != null){
try{
future.cancel(boolean.true);
}
catch (exception e){
log.debug("destroy thread error",e);
}
finally {
running_thread.remove(taskid);
}
}
}

public static bifunction<exporttaskpo,map<string, future<?>>,exporthandler> gethandlerbyexportkey(string exportkey){
// 根据导出类型key 找到handler 开始处理
return handler_map.get(exportkey);
}


public static void addtasktoque(list<exporttaskpo> needexecutetask){
if(collectionutils.isnotempty(needexecutetask)){
task_que.addall(needexecutetask);
}
}

public static void inittaskwhenapprun(){
try{
list<exporttaskpo> needexecutetask = export_task_repository
.lambdaquery()
.in(exporttaskpo::getstatus, arrays.aslist(exportstatusenums.init.getstatus(), exportstatusenums.running.getstatus()))
.orderbyasc(exporttaskpo::getgmtcreate)
.list();
log.info("initallwaitexecutetask={}",needexecutetask);
addtasktoque(needexecutetask);
}
catch (exception e){
log.error("initallwaitexecutetask error",e);
}
}
}

工厂类定了轮训消费,定义了哪个类型的任务由哪个处理器处理,定义了执行中的任务以便删除任务时立刻任务执行,等等

工厂类相当于总管,但是具体的实现是由exporthandler这个类来完成处理,那么这个类又做了什么些事情呢~~~

import javax.servlet.http.httpservletresponse;
import java.io.bufferedoutputstream;
import java.io.file;
import java.math.bigdecimal;
import java.math.roundingmode;
import java.net.urlencoder;
import java.nio.charset.standardcharsets;
import java.nio.file.files;
import java.util.*;
import java.util.concurrent.callable;
import java.util.concurrent.future;
import java.util.zip.zipentry;
import java.util.zip.zipoutputstream;

@slf4j
public abstract class exporthandler implements callable<boolean> {
public final exporttaskpo taskinfo;
private final map<string, future<?>> runmap;
private static final exporttaskrepository export_task_repository = springutil.getbean(exporttaskrepository.class);
private static final fileconfig file_config = springutil.getbean(fileconfig.class);
private static final datetimeservice date_time_service = springutil.getbean(datetimeservice.class);

public exporthandler(exporttaskpo taskinfo,map<string,future<?>> runmap){
this.taskinfo = taskinfo;
this.runmap = runmap;
}

/**
 * 数据小于5w条时直接导出
 * @param response 导出response
 * @throws exception 异常时直接抛出
 */
public void directexport(httpservletresponse response) throws exception {
fileutils.checkfilename(taskinfo.getfilename());

// 设置响应类型以及文件名
response.reset();
response.setcontenttype(globalconstants.content_type_xlsx);
string encodename = urlencoder.encode(taskinfo.getfilename(),globalconstants.encoder_utf8);
response.setheader(globalconstants.content_disposition_key, globalconstants.content_disposition_file_prefix.concat(encodename));

// 写入数据
easyexcel
.write(response.getoutputstream())
.exceltype(exceltypeenum.xlsx)
.charset(standardcharsets.utf_8)
.sheet(filenameutil.mainname(taskinfo.getfilename()))
.head(getheader())
.dowrite(getdata(null,null));
}

/**
 * 默认的数据大于5w时的异步导出处理逻辑 默认单sheet页 简单实现更新任务进度
 * 如果有复杂的多sheet页导出 请在你的handler里面重写此方法
 */
public void asyncexport(){
zipoutputstream zipout = null;
file file = null;
// 执行导出逻辑
try {
// 任务执行中
log.info("begin export,taskid:{}",taskinfo.getid());
updatetaskinfo(bigdecimal.zero + globalconstants.per_cent,exportstatusenums.running.getstatus(),null);

// 检查文件名称 获取没有后缀的文件名
fileutils.checkfilename(taskinfo.getfilename());
string filemainname = filenameutil.mainname(taskinfo.getfilename());

// 先在服务器生成文件,得到输出流
string filepath = fileutils.getdatefilename(filemainname, globalconstants.file_type_zip);
file = fileutils.getabsolutefile(file_config.getexportpath(), filepath);
zipout = new zipoutputstream(new bufferedoutputstream(files.newoutputstream(file.topath())));

// 根据总量数据以及分页每条的数据  得到要导出多少次并且遍历 开始导入数据
long exportcount = calculateexportcount(gettotal(),taskinfo.getsingleexportnum());

// 遍历导出的次数 每次生成一个excel文件
dealexport(exportcount,zipout);

// 导出完成后更新文件路径,完成时间以及状态
taskinfo.setfilepath(filepath);
updatetaskinfo(null,exportstatusenums.success.getstatus(),date_time_service.getcurrenttime());
log.info("end export,taskid:{}",taskinfo.getid());
}
// 异常打印异常信息 更新任务进度为-- 状态为失败  如果生成了文件 还需要把文件删除
catch (exception e) {
log.error("export error,taskid:{}",taskinfo.getid(),e);
updatetaskinfo(globalconstants.default_value,exportstatusenums.failed.getstatus(),date_time_service.getcurrenttime());

if(file != null){
boolean delete = file.delete();
log.info("export error,taskid:{},file delete:{}",taskinfo.getid(),delete);
}
}
// 最终从执行中的map里移除当前任务
finally {
if(zipout != null){
fileutils.safeclose(zipout);
}
}
}

private void dealexport(long exportcount,zipoutputstream zipout) throws exception{
// 遍历导出的次数 每次生成一个excel文件
for (long i = 1; i <= exportcount; i++) {
// 新建zip中的其中一个文件
string eachname = string.format("%02d", i) + exceltypeenum.xlsx.getvalue();
zipout.putnextentry(new zipentry(new string(eachname.getbytes(standardcharsets.utf_8))));

// 新建好了后往zip写入数据
list<list<string>> singledata = getdata(i, taskinfo.getsingleexportnum());
easyexcel
.write(zipout)
.autoclosestream(boolean.false)
.exceltype(exceltypeenum.xlsx)
.charset(standardcharsets.utf_8)
.sheet()
.head(getheader())
.dowrite(singledata);

// 关闭entry 代表zip当前的其中一个文件已经写入结束
zipout.closeentry();

// 更新进度
updatetaskinfo(calculateprogress(i,exportcount),exportstatusenums.running.getstatus(),null);
singledata.clear();
}
}

private void updatetaskinfo(string progress,integer status,string completetime){
optional.ofnullable(progress).ifpresent(taskinfo::setprogress);
optional.ofnullable(status).ifpresent(taskinfo::setstatus);
optional.ofnullable(completetime).ifpresent(taskinfo::setcompletetime);
export_task_repository.updatebyid(taskinfo);
}

private long calculateexportcount(long total,long singleexportnum){
long exportcount = total / singleexportnum;
long mod = total % singleexportnum;
if(mod != 0){
exportcount += 1;
}

return exportcount;
}

private string calculateprogress(long thiscount,long exportcount){
bigdecimal progress = bigdecimal.valueof(thiscount)
.divide(bigdecimal.valueof(exportcount), 2, roundingmode.half_up)
.multiply(new bigdecimal(globalconstants.progress_over))
.setscale(bigdecimal.zero.intvalue(),roundingmode.half_up);

return progress + globalconstants.per_cent;
}

@override
public boolean call(){
try{
asyncexport();
return boolean.true;
}
catch (exception e){
log.error("export task:{} error",taskinfo.getid(),e);
return boolean.false;
}
finally {
if(taskinfo != null && runmap != null){
runmap.remove(taskinfo.getid());
}
}
}

/**
 * 需要实现的查询total的方法
 */
public abstract long gettotal() throws exception;

/**
 * 需要实现的组装表头的方法
 */
public abstract list<list<string>> getheader();

/**
 * 需要实现的组装表体数据的方法
 */
public abstract list<list<string>> getdata(long pageno,long pagesize) throws exception;
}

噢,原来是一个抽象类,里面把需要用到的一些逻辑,例如生成文件,导入等都已经实现了,那么所有用到该架子导出的人 只需要继承当前类,实现里面的gettotal getheader getdata三个方法就可以啦,比如我们工厂类里面写的这段代码,demo_export由demoexporthandler进行处理,现在最后来看看这个demoexporthandler怎么实现的把

static {
handler_map.put(exporttypeenums.demo_export.getexportkey(), demoexporthandler::new);
}

demoexporthandler代码:

import java.util.*;
import java.util.concurrent.future;
import java.util.stream.collectors;

@slf4j
public class demoexporthandler extends exporthandler {

private static final standardpointrepository standard_point_repository = springutil.getbean(standardpointrepository.class);
private static final jsonservice json_service = springutil.getbean(jsonservice.class);
private static final i18nutilservice i_18_n_util_service = springutil.getbean(i18nutilservice.class);

public demoexporthandler(exporttaskpo taskinfo, map<string, future<?>> runmap) {
super(taskinfo, runmap);
}

@override
@suppresswarnings("unchecked")
public long gettotal() throws exception{

// 导出参数 反序列化为自己想要的类型的对象
map<string,object> parammap = getparamobj(taskinfo.getparams());

// 取出自己想要的数据,根据条件进行导出
list<string> pointlist = (list<string>)parammap.get("pointlist");

// 此处demo使用 total请根据自己的逻辑查询
list<pointinfobean> pointbeans = standard_point_repository.selectpointbyidlist(pointlist);
return (long) pointbeans.size();
}

@override
public list<list<string>> getheader() {
// 组装表头
return arrays.aslist(collections.singletonlist("测点id"),
collections.singletonlist("测点名称"),collections.singletonlist("测点单位"));
}

@override
@suppresswarnings("unchecked")
public list<list<string>> getdata(long pageno, long pagesize) throws exception {
// 模拟执行导出逻辑执行20s
thread.sleep(20000l);

// 导出参数 反序列化为自己想要的类型的对象
map<string,object> parammap = getparamobj(taskinfo.getparams());

// 取出自己想要的数据,根据条件进行导出
string lang = (string)parammap.get("language-option");
list<string> pointlist = (list<string>)parammap.get("pointlist");

// 查询数据
list<pointinfobean> pointbeans = standard_point_repository.selectpointbyidlist(pointlist);
list<list<string>> alldata = pointbeans
.stream()
.map(bean -> arrays.aslist(bean.getid(), i_18_n_util_service
.getmapfieldbylanguageoption(bean.getnamei18n(), lang), bean.getunit()))
.collect(collectors.tolist());

// 分页或者不分页返回
if(pageno == null || pagesize == null){
return alldata;
}

return pageutils.getpagelist(alldata,pageno.intvalue(),pagesize.intvalue());
}

@suppresswarnings("unchecked")
private map<string,object> getparamobj(string params) throws uedmexception {
map<string,object> parammap = json_service.jsontoobject(params,map.class);
log.debug("getparamobj params={},obj={}",params,parammap);

return parammap;
}
}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com