spring batch批量处理数据
spring batch 是一个由 pivotal software(原 springsource,现属于 vmware)开发的批处理框架,它是 spring 框架的一部分,主要用于创建高效、健壮的批量数据处理应用。
spring batch 设计用于处理大量的记录,例如在夜间处理或定期运行的数据加载、转换和整合操作。
spring batch的主要特性包括:
- 事务管理:支持事务边界内的数据处理,确保数据完整性。
- 并发处理:允许并行处理数据,提高处理速度。
- 重试机制:当出现故障时,可以配置重试策略以重新处理失败的记录。
- 跳过机制:能够跳过某些失败的记录而不中断整个批处理作业。
- 持久化状态管理:使用
jobrepository来跟踪作业的状态,即使在系统重启后也能恢复作业。 - 分片/分区:可以将数据集分割成小块,并在多个处理器上并行处理。
- 远程执行:支持跨机器的作业执行。
- 监控和日志:提供详细的日志记录和作业执行的监控能力。
spring batch的架构包括以下几个核心组件:
- job:这是批处理作业的最高级别抽象,可以包含一个或多个步骤。
- step:是批处理作业中的一个逻辑单元,可以是任务步骤(如读取、处理、写入数据)或决策步骤。
- itemreader:负责从数据源读取数据项。
- itemprocessor:对读取的数据项进行处理。
- itemwriter:将处理后的数据写入目标数据源。
- joblauncher:负责启动和执行作业。
- jobrepository:管理作业的元数据和状态,通常与数据库交互。
spring batch 不是一个调度框架,它专注于批处理作业的实现细节,通常需要与其他调度框架(如 quartz 或 cron)结合使用,以便控制作业何时启动。由于其高度的可配置性和灵活性,spring batch 成为了企业级批处理应用的首选框架之一。
在spring boot项目中集成spring batch涉及几个关键步骤,下面举个例子,说明如何设置一个基本的spring batch环境:
1. 添加依赖
首先,在pom.xml文件中添加spring batch和spring boot starter batch的依赖:
<dependencies>
<!-- spring batch -->
<dependency>
<groupid>org.springframework.batch</groupid>
<artifactid>spring-batch-core</artifactid>
<version>4.x.y.release</version> <!-- 使用最新稳定版 -->
</dependency>
<!-- spring boot starter batch -->
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-batch</artifactid>
</dependency>
<!-- 数据库连接池 -->
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-jdbc</artifactid>
</dependency>
<!-- 数据库驱动 -->
<dependency>
<groupid>com.mysql</groupid>
<artifactid>mysql-connector-java</artifactid>
</dependency>
<!-- 如果使用h2作为内存数据库 -->
<dependency>
<groupid>com.h2database</groupid>
<artifactid>h2</artifactid>
</dependency>
</dependencies>
2. 配置数据源和jobrepository
spring batch需要一个数据源来存储作业元数据和状态。
这通常通过application.properties或application.yml文件配置:
spring.datasource.url=jdbc:mysql://localhost:3306/batchdb spring.datasource.username=batchuser spring.datasource.password=batchpassword spring.datasource.driver-class-name=com.mysql.cj.jdbc.driver # spring batch配置 spring.batch.job.enabled=false # 设置为false,避免在启动时自动执行任何job
3. 创建job和step
定义一个job,并为其创建一个或多个step。
这通常通过一个@configuration类和@enablebatchprocessing注解完成:
@configuration
@enablebatchprocessing
public class batchconfig {
@autowired
private jobbuilderfactory jobbuilderfactory;
@autowired
private stepbuilderfactory stepbuilderfactory;
@bean
public job importuserjob() {
return jobbuilderfactory.get("importuserjob")
.incrementer(new runidincrementer())
.flow(importuserdatastep())
.end()
.build();
}
@bean
public step importuserdatastep() {
return stepbuilderfactory.get("importuserdatastep")
.<user, user>chunk(10)
.reader(useritemreader(null))
.processor(useritemprocessor())
.writer(useritemwriter())
.build();
}
}
4. 实现itemreader, itemprocessor, 和 itemwriter
在上面的示例中,importuserdatastep()使用chunk-oriented步骤,这意味着它将数据分批处理。
你需要实现itemreader, itemprocessor, 和 itemwriter来分别读取、处理和写入数据:
@bean
public flatfileitemreader<user> useritemreader(resource resource) {
defaultlinemapper<user> linemapper = new defaultlinemapper<>();
delimitedlinetokenizer tokenizer = new delimitedlinetokenizer();
tokenizer.setnames("firstname", "lastname");
beanwrapperfieldsetmapper<user> fieldsetmapper = new beanwrapperfieldsetmapper<>();
fieldsetmapper.settargettype(user.class);
linemapper.setlinetokenizer(tokenizer);
linemapper.setfieldsetmapper(fieldsetmapper);
flatfileitemreader<user> itemreader = new flatfileitemreader<>();
itemreader.setresource(resource);
itemreader.setlinestoskip(1); // 跳过标题行
itemreader.setlinemapper(linemapper);
return itemreader;
}
@bean
public itemprocessor<user, user> useritemprocessor() {
return new itemprocessor<user, user>() {
@override
public user process(user item) throws exception {
item.setfirstname(item.getfirstname().touppercase());
return item;
}
};
}
@bean
public jpapagingitemwriter<user> useritemwriter(jpaitemwriterbuilder<user> builder) {
return builder
.entitymanagerfactory(entitymanagerfactory)
.build();
}
5. 启动job
在你的主类中,你可以注入joblauncher和job,然后调用它们来启动作业:
@autowired
private joblauncher joblauncher;
@autowired
private job importuserjob;
// 在适当的地方调用
joblauncher.run(importuserjob, new jobparametersbuilder().addlong("time", system.currenttimemillis()).tojobparameters());
总结
以上步骤会帮助你在一个spring boot项目中集成spring batch。请注意,实际的配置可能需要根据你的具体需求和环境进行调整。
这些仅为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论