一、spring batch 基础架构
1.1 核心配置
@configuration
@enablebatchprocessing
public class batchconfig {
@autowired
private jobrepository jobrepository;
@autowired
private platformtransactionmanager transactionmanager;
@bean
public joblauncher joblauncher() throws exception {
taskexecutorjoblauncher joblauncher = new taskexecutorjoblauncher();
joblauncher.setjobrepository(jobrepository);
joblauncher.settaskexecutor(new simpleasynctaskexecutor());
joblauncher.afterpropertiesset();
return joblauncher;
}
@bean
public jobexplorer jobexplorer(datasource datasource) throws exception {
jobexplorerfactorybean factorybean = new jobexplorerfactorybean();
factorybean.setdatasource(datasource);
factorybean.afterpropertiesset();
return factorybean.getobject();
}
@bean
public jobregistry jobregistry() {
return new mapjobregistry();
}
@bean
public jobregistrybeanpostprocessor jobregistrybeanpostprocessor() {
jobregistrybeanpostprocessor postprocessor = new jobregistrybeanpostprocessor();
postprocessor.setjobregistry(jobregistry());
return postprocessor;
}
}
1.2 数据库表结构
-- spring batch 元数据表
-- batch_job_instance: 作业实例
-- batch_job_execution: 作业执行
-- batch_job_execution_params: 作业参数
-- batch_step_execution: 步骤执行
-- batch_job_execution_context: 作业上下文
-- batch_step_execution_context: 步骤上下文
-- 创建自定义监控表
create table batch_job_monitoring (
id bigint primary key auto_increment,
job_name varchar(100) not null,
job_instance_id bigint,
job_execution_id bigint,
start_time timestamp,
end_time timestamp,
status varchar(20),
read_count bigint default 0,
write_count bigint default 0,
skip_count bigint default 0,
error_message text,
created_at timestamp default current_timestamp
);
二、简单任务示例
2.1 csv 导入数据库
@configuration
public class csvtodatabasejobconfig {
@autowired
private jobrepository jobrepository;
@autowired
private platformtransactionmanager transactionmanager;
@bean
public job csvtodatabasejob() {
return new jobbuilder("csvtodatabasejob", jobrepository)
.start(csvtodatabasestep())
.listener(jobexecutionlistener())
.build();
}
@bean
public step csvtodatabasestep() {
return new stepbuilder("csvtodatabasestep", jobrepository)
.<productinput, product>chunk(1000, transactionmanager)
.reader(csvitemreader())
.processor(productitemprocessor())
.writer(databaseitemwriter())
.faulttolerant()
.skiplimit(10)
.skip(validationexception.class)
.retrylimit(3)
.retry(transientdataaccessexception.class)
.listener(stepexecutionlistener())
.build();
}
@bean
public flatfileitemreader<productinput> csvitemreader() {
return new flatfileitemreaderbuilder<productinput>()
.name("csvitemreader")
.resource(new filesystemresource("input/products.csv"))
.delimited()
.names("id", "name", "description", "price", "category")
.fieldsetmapper(new beanwrapperfieldsetmapper<>() {{
settargettype(productinput.class);
}})
.linestoskip(1) // 跳过表头
.build();
}
@bean
public itemprocessor<productinput, product> productitemprocessor() {
return input -> {
product product = new product();
product.setid(input.getid());
product.setname(input.getname().trim());
product.setdescription(input.getdescription());
product.setprice(new bigdecimal(input.getprice()));
product.setcategory(input.getcategory());
product.setcreatedat(localdatetime.now());
return product;
};
}
@bean
public jdbcbatchitemwriter<product> databaseitemwriter() {
return new jdbcbatchitemwriterbuilder<product>()
.itemsqlparametersourceprovider(new beanpropertyitemsqlparametersourceprovider<>())
.sql("insert into products (id, name, description, price, category, created_at) " +
"values (:id, :name, :description, :price, :category, :createdat) " +
"on duplicate key update " +
"name = values(name), description = values(description), " +
"price = values(price), category = values(category)")
.datasource(datasource)
.build();
}
}
2.2 数据库导出到文件
@configuration
public class databasetofilejobconfig {
@bean
public job exportordersjob() {
return new jobbuilder("exportordersjob", jobrepository)
.start(exportordersstep())
.build();
}
@bean
public step exportordersstep() {
return new stepbuilder("exportordersstep", jobrepository)
.<order, orderoutput>chunk(500, transactionmanager)
.reader(orderitemreader())
.processor(orderitemprocessor())
.writer(orderitemwriter())
.build();
}
@bean
public jdbcpagingitemreader<order> orderitemreader() {
return new jdbcpagingitemreaderbuilder<order>()
.name("orderitemreader")
.datasource(datasource)
.queryprovider(new pagingqueryprovider() {
@override
public void init(datasource datasource) {}
@override
public string getsortkey() {
return "id";
}
@override
public string getselectclause() {
return "select id, user_id, total_amount, status, created_at";
}
@override
public string getfromclause() {
return "from orders";
}
@override
public string getwhereclause() {
return "where created_at >= :startdate and created_at <= :enddate";
}
})
.parametervalues(map.of("startdate", startdate, "enddate", enddate))
.pagesize(1000)
.rowmapper(new orderrowmapper())
.build();
}
@bean
public flatfileitemwriter<orderoutput> orderitemwriter() {
return new flatfileitemwriterbuilder<orderoutput>()
.name("orderitemwriter")
.resource(new filesystemresource("output/orders.csv"))
.delimited()
.delimiter(",")
.names("orderid", "userid", "amount", "status", "createddate")
.headercallback(writer -> writer.write("orderid,userid,amount,status,createddate"))
.footercallback(writer -> writer.write("total records exported"))
.build();
}
}
三、复杂任务处理
3.1 多步骤任务
@configuration
public class complexbatchjobconfig {
@bean
public job orderprocessingjob() {
return new jobbuilder("orderprocessingjob", jobrepository)
.start(validateorderstep())
.next(processpaymentstep())
.next(updateinventorystep())
.next(sendnotificationstep())
.on("failed").to(errorhandlingstep())
.from(sendnotificationstep()).on("*").to(cleanupstep())
.end()
.build();
}
@bean
public step validateorderstep() {
return new stepbuilder("validateorderstep", jobrepository)
.<order, validatedorder>chunk(100, transactionmanager)
.reader(pendingorderreader())
.processor(ordervalidator())
.writer(validatedorderwriter())
.build();
}
@bean
public step processpaymentstep() {
return new stepbuilder("processpaymentstep", jobrepository)
.tasklet((contribution, chunkcontext) -> {
// 处理支付逻辑
jobparameters params = chunkcontext.getstepcontext().getjobparameters();
string batchid = params.getstring("batchid");
paymentservice.processbatchpayments(batchid);
return repeatstatus.finished;
}, transactionmanager)
.build();
}
@bean
public step updateinventorystep() {
return new stepbuilder("updateinventorystep", jobrepository)
.<orderitem, inventoryupdate>chunk(200, transactionmanager)
.reader(orderitemreader())
.processor(inventoryprocessor())
.writer(inventorywriter())
.build();
}
@bean
public flow splitflow() {
return new flowbuilder<simpleflow>("splitflow")
.split(taskexecutor())
.add(flow1(), flow2(), flow3())
.build();
}
@bean
public flow flow1() {
return new flowbuilder<simple
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论