当前位置: 代码网 > it编程>编程语言>Java > SpringBoot集成flink

SpringBoot集成flink

2024年07月28日 Java 我要评论
flink

flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。
最大亮点是流处理,最适合的应用场景是低时延的数据处理。
场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。

环境搭建:

①、安装flink

https://nightlies.apache.org/flink/flink-docs-master/zh/docs/try-flink/local_installation/

②、安装netcat

netcat(又称为nc)是一个计算机网络工具,它可以在两台计算机之间建立 tcp/ip 或 udp 连接。
用于测试网络中的端口,发送文件等操作。
进行网络调试和探测,也可以进行加密连接和远程管理等高级网络操作

yum install -y nc # 安装nc命令

nc -lk 8888 # 启动socket端口

无界流之读取socket文本流

一、依赖


<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
         xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactid>springboot-demo</artifactid>
        <groupid>com.et</groupid>
        <version>1.0-snapshot</version>
    </parent>
    <modelversion>4.0.0</modelversion>

    <artifactid>flink</artifactid>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>

        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-web</artifactid>
        </dependency>

        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-autoconfigure</artifactid>
        </dependency>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-test</artifactid>
            <scope>test</scope>
        </dependency>
        <!-- 添加 flink 依赖 -->
        <dependency>
            <groupid>org.apache.flink</groupid>
            <artifactid>flink-streaming-java</artifactid>
            <version>1.17.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupid>org.apache.flink</groupid>
            <artifactid>flink-java</artifactid>
            <version>1.17.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupid>org.apache.flink</groupid>
            <artifactid>flink-clients</artifactid>
            <version>1.17.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
        <dependency>
            <groupid>org.apache.flink</groupid>
            <artifactid>flink-connector-base</artifactid>
            <version>1.17.0</version>
        </dependency>

        <dependency>
            <groupid>org.apache.flink</groupid>
            <artifactid>flink-connector-files</artifactid>
            <version>1.17.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
        <dependency>
            <groupid>org.apache.flink</groupid>
            <artifactid>flink-connector-kafka</artifactid>
            <version>1.17.0</version>
        </dependency>
        <dependency>
            <groupid>org.apache.flink</groupid>
            <artifactid>flink-runtime-web</artifactid>
            <version>1.17.0</version>
        </dependency>


    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupid>org.apache.maven.plugins</groupid>
                <artifactid>maven-shade-plugin</artifactid>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.appendingtransformer">
                                    <resource>meta-inf/spring.handlers</resource>
                                </transformer>
                                <transformer
                                        implementation="org.springframework.boot.maven.propertiesmergingresourcetransformer">
                                    <resource>meta-inf/spring.factories</resource>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.appendingtransformer">
                                    <resource>meta-inf/spring.schemas</resource>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.servicesresourcetransformer" />
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.manifestresourcetransformer">
                                    <mainclass>com.et.flink.job.socketjob</mainclass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

二、soketjob

public class socketjob{
	
	public static void main(string[] args)throws exception{
		
		// 创建执行环境
        streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
        // 指定并行度,默认电脑线程数
        env.setparallelism(3);
        // 读取数据socket文本流 指定监听 ip 端口 只有在接收到数据才会执行任务
        datastreamsource<string> socketds = env.sockettextstream("172.24.4.193", 8888);

        // 处理数据: 切换、转换、分组、聚合 得到统计结果
        singleoutputstreamoperator<tuple2<string, integer>> sum = socketds
                .flatmap(
                        (string value, collector<tuple2<string, integer>> out) -> {
                            string[] words = value.split(" ");
                            for (string word : words) {
                                out.collect(tuple2.of(word, 1));
                            }
                        }
                )
                .setparallelism(2)
                // // 显式地提供类型信息:对于flatmap传入lambda表达式,系统只能推断出返回的是tuple2类型,而无法得到tuple2<string, long>。只有显式设置系统当前返回类型,才能正确解析出完整数据
                .returns(new typehint<tuple2<string, integer>>() {
                })
//                .returns(types.tuple(types.string,types.int))
                .keyby(value -> value.f0)
                .sum(1);


        // 输出
        sum.print();

        // 执行
        env.execute();
	}
}

测试:

启动socket流:

nc -l 8888

本地执行:直接ideal启动main程序,在socket流中输入

abc bcd cde
bcd cde fgh
cde fgh hij

在这里插入图片描述

集群执行:
执行maven打包,将打包的jar上传到集群中
在这里插入图片描述

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com