当前位置: 代码网 > it编程>编程语言>Java > Spring Boot 整合 Apache Flink 的详细过程

Spring Boot 整合 Apache Flink 的详细过程

2025年06月06日 Java 我要评论
spring boot 整合 apache flink 教程一、背景与目标apache flink 是一个高性能的分布式流处理框架,而spring boot提供了快速构建企业级应用的能力。整合二者可实

spring boot 整合 apache flink 教程

一、背景与目标

apache flink 是一个高性能的分布式流处理框架,而spring boot提供了快速构建企业级应用的能力。整合二者可实现:

  • 利用spring boot的依赖注入、配置管理等功能简化flink作业开发
  • 构建完整的微服务架构,将流处理嵌入spring生态
  • 实现动态作业提交与管理

二、环境准备

  • jdk 17+
  • maven 3.8+
  • spring boot 3.1.5
  • flink 1.17.2

三、创建项目 & 添加依赖

1. 创建spring boot项目

使用spring initializr生成基础项目,选择:

  • maven
  • spring web(可选,用于创建rest接口)

2. 添加flink依赖

<!-- pom.xml -->
<dependencies>
    <!-- spring boot starter -->
    <dependency>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter</artifactid>
    </dependency>
    <!-- flink核心依赖 -->
    <dependency>
        <groupid>org.apache.flink</groupid>
        <artifactid>flink-java</artifactid>
        <version>1.17.2</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupid>org.apache.flink</groupid>
        <artifactid>flink-streaming-java</artifactid>
        <version>1.17.2</version>
        <scope>provided</scope>
    </dependency>
    <!-- 本地执行时需添加 -->
    <dependency>
        <groupid>org.apache.flink</groupid>
        <artifactid>flink-runtime</artifactid>
        <version>1.17.2</version>
        <scope>test</scope>
    </dependency>
</dependencies>

四、基础整合示例

1. 编写flink流处理作业

// src/main/java/com/example/demo/flink/wordcountjob.java
import org.apache.flink.api.common.functions.flatmapfunction;
import org.apache.flink.streaming.api.datastream.datastream;
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;
import org.apache.flink.util.collector;
public class wordcountjob {
    public static void execute() throws exception {
        final streamexecutionenvironment env = 
            streamexecutionenvironment.getexecutionenvironment();
        datastream<string> text = env.fromelements(
            "spring boot整合flink",
            "flink实时流处理",
            "spring生态集成"
        );
        datastream<wordcount> counts = text
            .flatmap(new flatmapfunction<string, wordcount>() {
                @override
                public void flatmap(string value, collector<wordcount> out) {
                    for (string word : value.split("\\s")) {
                        out.collect(new wordcount(word, 1l));
                    }
                }
            })
            .keyby(value -> value.word)
            .sum("count");
        counts.print();
        env.execute("spring boot flink job");
    }
    public static class wordcount {
        public string word;
        public long count;
        public wordcount() {}
        public wordcount(string word, long count) {
            this.word = word;
            this.count = count;
        }
        @override
        public string tostring() {
            return word + " : " + count;
        }
    }
}

2. 在spring boot中启动作业

// src/main/java/com/example/demo/demoapplication.java
@springbootapplication
public class demoapplication implements commandlinerunner {
    public static void main(string[] args) {
        springapplication.run(demoapplication.class, args);
    }
    @override
    public void run(string... args) throws exception {
        wordcountjob.execute(); // 启动flink作业
    }
}

五、进阶整合 - 通过rest api动态提交作业

1. 创建job提交服务

// src/main/java/com/example/demo/service/flinkjobservice.java
@service
public class flinkjobservice {
    public string submitwordcountjob(list<string> inputlines) {
        try {
            final streamexecutionenvironment env = 
                streamexecutionenvironment.getexecutionenvironment();
            datastream<string> text = env.fromcollection(inputlines);
            // ...(同上wordcount逻辑)
            jobexecutionresult result = env.execute();
            return "jobid: " + result.getjobid();
        } catch (exception e) {
            return "job failed: " + e.getmessage();
        }
    }
}

2. 创建rest控制器

// src/main/java/com/example/demo/controller/jobcontroller.java
@restcontroller
@requestmapping("/jobs")
public class jobcontroller {
    @autowired
    private flinkjobservice flinkjobservice;
    @postmapping("/wordcount")
    public string submitwordcount(@requestbody list<string> inputs) {
        return flinkjobservice.submitwordcountjob(inputs);
    }
}

六、关键配置说明

1. application.properties

# 设置flink本地执行环境
spring.flink.local.enabled=true
spring.flink.job.name=springbootflinkjob
# 调整并行度(根据cpu核心数)
spring.flink.parallelism=4

2. 解决依赖冲突

在pom.xml中排除冲突依赖:

<dependency>
    <groupid>org.apache.flink</groupid>
    <artifactid>flink-core</artifactid>
    <version>1.17.2</version>
    <exclusions>
        <exclusion>
            <groupid>log4j</groupid>
            <artifactid>log4j</artifactid>
        </exclusion>
    </exclusions>
</dependency>

七、运行与验证

启动spring boot应用:

mvn spring-boot:run

调用api提交作业:

curl -x post -h "content-type: application/json" \
-d '["hello flink", "spring boot integration"]' \
http://localhost:8080/jobs/wordcount

查看控制台输出:

flink> spring : 1
flink> boot : 1
flink> integration : 1
...

八、生产环境注意事项

集群部署:将打包后的jar提交到flink集群

flink run -c com.example.demo.demoapplication your-application.jar

状态管理:集成flink state backend(如rocksdb)

监控集成:通过micrometer接入spring boot actuator

资源隔离:使用yarnkubernetes部署模式

九、完整项目结构

src/
├── main/
│   ├── java/
│   │   ├── com/example/demo/
│   │   │   ├── demoapplication.java
│   │   │   ├── flink/
│   │   │   │   └── wordcountjob.java
│   │   │   ├── controller/
│   │   │   ├── service/
│   ├── resources/
│   │   └── application.properties
pom.xml

通过以上步骤,即可实现spring boot与apache flink的深度整合。这种架构特别适合需要将实时流处理能力嵌入微服务体系的场景,如实时风控系统、iot数据处理平台等。后续可扩展集成kafka、hbase等大数据组件。

到此这篇关于spring boot 整合 apache flink 教程的文章就介绍到这了,更多相关spring boot 整合 apache flink内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com