当前位置: 代码网 > it编程>编程语言>Java > Spring Batch大数据量处理之从入门到精通实践

Spring Batch大数据量处理之从入门到精通实践

2026年04月18日 Java 我要评论
一、spring batch 基础架构1.1 核心配置@configuration@enablebatchprocessingpublic class batchconfig { @autowi

一、spring batch 基础架构

1.1 核心配置

@configuration
@enablebatchprocessing
public class batchconfig {

    @autowired
    private jobrepository jobrepository;

    @autowired
    private platformtransactionmanager transactionmanager;

    @bean
    public joblauncher joblauncher() throws exception {
        taskexecutorjoblauncher joblauncher = new taskexecutorjoblauncher();
        joblauncher.setjobrepository(jobrepository);
        joblauncher.settaskexecutor(new simpleasynctaskexecutor());
        joblauncher.afterpropertiesset();
        return joblauncher;
    }

    @bean
    public jobexplorer jobexplorer(datasource datasource) throws exception {
        jobexplorerfactorybean factorybean = new jobexplorerfactorybean();
        factorybean.setdatasource(datasource);
        factorybean.afterpropertiesset();
        return factorybean.getobject();
    }

    @bean
    public jobregistry jobregistry() {
        return new mapjobregistry();
    }

    @bean
    public jobregistrybeanpostprocessor jobregistrybeanpostprocessor() {
        jobregistrybeanpostprocessor postprocessor = new jobregistrybeanpostprocessor();
        postprocessor.setjobregistry(jobregistry());
        return postprocessor;
    }
}

1.2 数据库表结构

-- spring batch 元数据表
-- batch_job_instance: 作业实例
-- batch_job_execution: 作业执行
-- batch_job_execution_params: 作业参数
-- batch_step_execution: 步骤执行
-- batch_job_execution_context: 作业上下文
-- batch_step_execution_context: 步骤上下文

-- 创建自定义监控表
create table batch_job_monitoring (
    id bigint primary key auto_increment,
    job_name varchar(100) not null,
    job_instance_id bigint,
    job_execution_id bigint,
    start_time timestamp,
    end_time timestamp,
    status varchar(20),
    read_count bigint default 0,
    write_count bigint default 0,
    skip_count bigint default 0,
    error_message text,
    created_at timestamp default current_timestamp
);

二、简单任务示例

2.1 csv 导入数据库

@configuration
public class csvtodatabasejobconfig {

    @autowired
    private jobrepository jobrepository;

    @autowired
    private platformtransactionmanager transactionmanager;

    @bean
    public job csvtodatabasejob() {
        return new jobbuilder("csvtodatabasejob", jobrepository)
            .start(csvtodatabasestep())
            .listener(jobexecutionlistener())
            .build();
    }

    @bean
    public step csvtodatabasestep() {
        return new stepbuilder("csvtodatabasestep", jobrepository)
            .<productinput, product>chunk(1000, transactionmanager)
            .reader(csvitemreader())
            .processor(productitemprocessor())
            .writer(databaseitemwriter())
            .faulttolerant()
            .skiplimit(10)
            .skip(validationexception.class)
            .retrylimit(3)
            .retry(transientdataaccessexception.class)
            .listener(stepexecutionlistener())
            .build();
    }

    @bean
    public flatfileitemreader<productinput> csvitemreader() {
        return new flatfileitemreaderbuilder<productinput>()
            .name("csvitemreader")
            .resource(new filesystemresource("input/products.csv"))
            .delimited()
            .names("id", "name", "description", "price", "category")
            .fieldsetmapper(new beanwrapperfieldsetmapper<>() {{
                settargettype(productinput.class);
            }})
            .linestoskip(1) // 跳过表头
            .build();
    }

    @bean
    public itemprocessor<productinput, product> productitemprocessor() {
        return input -> {
            product product = new product();
            product.setid(input.getid());
            product.setname(input.getname().trim());
            product.setdescription(input.getdescription());
            product.setprice(new bigdecimal(input.getprice()));
            product.setcategory(input.getcategory());
            product.setcreatedat(localdatetime.now());
            return product;
        };
    }

    @bean
    public jdbcbatchitemwriter<product> databaseitemwriter() {
        return new jdbcbatchitemwriterbuilder<product>()
            .itemsqlparametersourceprovider(new beanpropertyitemsqlparametersourceprovider<>())
            .sql("insert into products (id, name, description, price, category, created_at) " +
                 "values (:id, :name, :description, :price, :category, :createdat) " +
                 "on duplicate key update " +
                 "name = values(name), description = values(description), " +
                 "price = values(price), category = values(category)")
            .datasource(datasource)
            .build();
    }
}

2.2 数据库导出到文件

@configuration
public class databasetofilejobconfig {

    @bean
    public job exportordersjob() {
        return new jobbuilder("exportordersjob", jobrepository)
            .start(exportordersstep())
            .build();
    }

    @bean
    public step exportordersstep() {
        return new stepbuilder("exportordersstep", jobrepository)
            .<order, orderoutput>chunk(500, transactionmanager)
            .reader(orderitemreader())
            .processor(orderitemprocessor())
            .writer(orderitemwriter())
            .build();
    }

    @bean
    public jdbcpagingitemreader<order> orderitemreader() {
        return new jdbcpagingitemreaderbuilder<order>()
            .name("orderitemreader")
            .datasource(datasource)
            .queryprovider(new pagingqueryprovider() {
                @override
                public void init(datasource datasource) {}
                
                @override
                public string getsortkey() {
                    return "id";
                }
                
                @override
                public string getselectclause() {
                    return "select id, user_id, total_amount, status, created_at";
                }
                
                @override
                public string getfromclause() {
                    return "from orders";
                }
                
                @override
                public string getwhereclause() {
                    return "where created_at >= :startdate and created_at <= :enddate";
                }
            })
            .parametervalues(map.of("startdate", startdate, "enddate", enddate))
            .pagesize(1000)
            .rowmapper(new orderrowmapper())
            .build();
    }

    @bean
    public flatfileitemwriter<orderoutput> orderitemwriter() {
        return new flatfileitemwriterbuilder<orderoutput>()
            .name("orderitemwriter")
            .resource(new filesystemresource("output/orders.csv"))
            .delimited()
            .delimiter(",")
            .names("orderid", "userid", "amount", "status", "createddate")
            .headercallback(writer -> writer.write("orderid,userid,amount,status,createddate"))
            .footercallback(writer -> writer.write("total records exported"))
            .build();
    }
}

三、复杂任务处理

3.1 多步骤任务

@configuration
public class complexbatchjobconfig {

    @bean
    public job orderprocessingjob() {
        return new jobbuilder("orderprocessingjob", jobrepository)
            .start(validateorderstep())
            .next(processpaymentstep())
            .next(updateinventorystep())
            .next(sendnotificationstep())
            .on("failed").to(errorhandlingstep())
            .from(sendnotificationstep()).on("*").to(cleanupstep())
            .end()
            .build();
    }

    @bean
    public step validateorderstep() {
        return new stepbuilder("validateorderstep", jobrepository)
            .<order, validatedorder>chunk(100, transactionmanager)
            .reader(pendingorderreader())
            .processor(ordervalidator())
            .writer(validatedorderwriter())
            .build();
    }

    @bean
    public step processpaymentstep() {
        return new stepbuilder("processpaymentstep", jobrepository)
            .tasklet((contribution, chunkcontext) -> {
                // 处理支付逻辑
                jobparameters params = chunkcontext.getstepcontext().getjobparameters();
                string batchid = params.getstring("batchid");
                
                paymentservice.processbatchpayments(batchid);
                
                return repeatstatus.finished;
            }, transactionmanager)
            .build();
    }

    @bean
    public step updateinventorystep() {
        return new stepbuilder("updateinventorystep", jobrepository)
            .<orderitem, inventoryupdate>chunk(200, transactionmanager)
            .reader(orderitemreader())
            .processor(inventoryprocessor())
            .writer(inventorywriter())
            .build();
    }

    @bean
    public flow splitflow() {
        return new flowbuilder<simpleflow>("splitflow")
            .split(taskexecutor())
            .add(flow1(), flow2(), flow3())
            .build();
    }

    @bean
    public flow flow1() {
        return new flowbuilder<simple

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com