官方文档地址:first steps | apache flink
flink下载:https://archive.apache.org/dist/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.11.tgz
java环境:1.8.0_45
解压安装包:
[root@vm-9f-mysteel-dc-ebc-test03 opt]# tar -zxvf flink-1.14.4-bin-scala_2.11.tgz
flink-1.14.4/
flink-1.14.4/lib/
flink-1.14.4/lib/flink-dist_2.11-1.14.4.jar
flink-1.14.4/lib/flink-json-1.14.4.jar
flink-1.14.4/lib/flink-shaded-zookeeper-3.4.14.jar
flink-1.14.4/lib/flink-csv-1.14.4.jar
flink-1.14.4/lib/log4j-1.2-api-2.17.1.jar
flink-1.14.4/lib/flink-table_2.11-1.14.4.jar
flink-1.14.4/lib/log4j-slf4j-impl-2.17.1.jar
flink-1.14.4/lib/log4j-core-2.17.1.jar
flink-1.14.4/lib/log4j-api-2.17.1.jar
flink-1.14.4/readme.txt
flink-1.14.4/licenses/
flink-1.14.4/licenses/license.grizzled-slf4j
flink-1.14.4/licenses/license.javax.activation
flink-1.14.4/licenses/license.antlr-java-grammar-files
flink-1.14.4/licenses/license.reflections
flink-1.14.4/licenses/license.bouncycastle
........................
进入到bin目录,启动flink:
[root@vm-9f-mysteel-dc-ebc-test03 bin]# ./start-cluster.sh
访问web界面:http://192.168.201.143:8081
执行以下命令构建flink项目:
d:\\\apache-maven-3.6.0\\\bin\\\mvn.cmd archetype:generate -darchetypegroupid=org.apache.flink -darchetypeartifactid=flink-quickstart-java -darchetypeversion=1.7.2
构建完之后导入至idea,项目结构如下:
编写实时处理类:
package com.fen;
import org.apache.flink.api.common.functions.flatmapfunction;
import org.apache.flink.api.java.tuple.tuple2;
import org.apache.flink.streaming.api.datastream.datastreamsource;
import org.apache.flink.streaming.api.datastream.singleoutputstreamoperator;
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;
import org.apache.flink.util.collector;
public class streamingjob {
public static void main(string[] args) throws exception {
//参数检查
if (args.length != 2) {
system.err.println("usage:\nsockettextstreamwordcount <hostname> <port>");
return;
}
string hostname = args[0];
integer port = integer.parseint(args[1]);
final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
datastreamsource<string> stream = env.sockettextstream(hostname, port);
//计数
singleoutputstreamoperator<tuple2<string, integer>> sum = stream.flatmap(new linesplitter())
.keyby(0)
.sum(1);
sum.print();
env.execute("java wordcount from sockettextstream example");
}
public static final class linesplitter implements flatmapfunction<string, tuple2<string, integer>> {
@override
public void flatmap(string s, collector<tuple2<string, integer>> collector) {
string[] tokens = s.tolowercase().split("\\w+");
for (string token: tokens) {
if (token.length() > 0) {
collector.collect(new tuple2<string, integer>(token, 1));
}
}
}
}
}
这个实时处理作业的source 是 scoket ,slink是print,对实时接收数据中的单词进行个数统计
打包:
把jar包上传至安装了flink的服务器中:
在linux中安装nc:
[root@vm-9f-mysteel-dc-ebc-test03 bin]# yum install nc
利于nc启动socket server
[root@vm-9f-mysteel-dc-ebc-test03 ~]# nc -l 127.0.0.1 8888
执行flink job:
[root@vm-9f-mysteel-dc-ebc-test03 bin]# ./flink run -c com.fen.streamingjob ./com.fen-1.0-snapshot.jar 127.0.0.1 8888
看streaming作业会一直处于running运行中
再来看看怎么对实时数据进行统计的:
1、可以通过以下界面的形式进行取消:
2、使用命令取消:
[root@vm-9f-mysteel-dc-ebc-test03 bin]# ./flink list
slf4j: class path contains multiple slf4j bindings.
slf4j: found binding in [jar:file:/opt/flink-1.7.2/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/staticloggerbinder.class]
slf4j: found binding in [jar:file:/apply/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/staticloggerbinder.class]
slf4j: see http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
slf4j: actual binding is of type [org.slf4j.impl.log4jloggerfactory]
waiting for response...
------------------ running/restarting jobs -------------------
13.12.2022 19:17:49 : c99f75881ce909ef625c8311e0f5e575 : java wordcount from sockettextstream example (running)
--------------------------------------------------------------
no scheduled jobs.
[root@vm-9f-mysteel-dc-ebc-test03 bin]# ./flink cancel -m 127.0.0.1:8081 c99f75881ce909ef625c8311e0f5e575
slf4j: class path contains multiple slf4j bindings.
slf4j: found binding in [jar:file:/opt/flink-1.7.2/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/staticloggerbinder.class]
slf4j: found binding in [jar:file:/apply/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/staticloggerbinder.class]
slf4j: see http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
slf4j: actual binding is of type [org.slf4j.impl.log4jloggerfactory]
cancelling job c99f75881ce909ef625c8311e0f5e575.
cancelled job c99f75881ce909ef625c8311e0f5e575.
发表评论