flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。
最大亮点是流处理,最适合的应用场景是低时延的数据处理。
场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。
环境搭建:
①、安装flink
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/try-flink/local_installation/
②、安装netcat
netcat(又称为nc)是一个计算机网络工具,它可以在两台计算机之间建立 tcp/ip 或 udp 连接。
用于测试网络中的端口,发送文件等操作。
进行网络调试和探测,也可以进行加密连接和远程管理等高级网络操作
yum install -y nc # 安装nc命令
nc -lk 8888 # 启动socket端口
无界流之读取socket文本流
一、依赖
<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0"
xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactid>springboot-demo</artifactid>
<groupid>com.et</groupid>
<version>1.0-snapshot</version>
</parent>
<modelversion>4.0.0</modelversion>
<artifactid>flink</artifactid>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-web</artifactid>
</dependency>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-autoconfigure</artifactid>
</dependency>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-test</artifactid>
<scope>test</scope>
</dependency>
<!-- 添加 flink 依赖 -->
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-streaming-java</artifactid>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-java</artifactid>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-clients</artifactid>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-connector-base</artifactid>
<version>1.17.0</version>
</dependency>
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-connector-files</artifactid>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-connector-kafka</artifactid>
<version>1.17.0</version>
</dependency>
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-runtime-web</artifactid>
<version>1.17.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupid>org.apache.maven.plugins</groupid>
<artifactid>maven-shade-plugin</artifactid>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.appendingtransformer">
<resource>meta-inf/spring.handlers</resource>
</transformer>
<transformer
implementation="org.springframework.boot.maven.propertiesmergingresourcetransformer">
<resource>meta-inf/spring.factories</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.appendingtransformer">
<resource>meta-inf/spring.schemas</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.servicesresourcetransformer" />
<transformer
implementation="org.apache.maven.plugins.shade.resource.manifestresourcetransformer">
<mainclass>com.et.flink.job.socketjob</mainclass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
二、soketjob
public class socketjob{
public static void main(string[] args)throws exception{
// 创建执行环境
streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
// 指定并行度,默认电脑线程数
env.setparallelism(3);
// 读取数据socket文本流 指定监听 ip 端口 只有在接收到数据才会执行任务
datastreamsource<string> socketds = env.sockettextstream("172.24.4.193", 8888);
// 处理数据: 切换、转换、分组、聚合 得到统计结果
singleoutputstreamoperator<tuple2<string, integer>> sum = socketds
.flatmap(
(string value, collector<tuple2<string, integer>> out) -> {
string[] words = value.split(" ");
for (string word : words) {
out.collect(tuple2.of(word, 1));
}
}
)
.setparallelism(2)
// // 显式地提供类型信息:对于flatmap传入lambda表达式,系统只能推断出返回的是tuple2类型,而无法得到tuple2<string, long>。只有显式设置系统当前返回类型,才能正确解析出完整数据
.returns(new typehint<tuple2<string, integer>>() {
})
// .returns(types.tuple(types.string,types.int))
.keyby(value -> value.f0)
.sum(1);
// 输出
sum.print();
// 执行
env.execute();
}
}
测试:
启动socket流:
nc -l 8888
本地执行:直接ideal启动main程序,在socket流中输入
abc bcd cde
bcd cde fgh
cde fgh hij
集群执行:
执行maven打包,将打包的jar上传到集群中
发表评论