当前位置: 代码网 > it编程>编程语言>Java > flink单机部署和简单使用

flink单机部署和简单使用

2024年07月28日 Java 我要评论
这个实时处理作业的source 是 scoket ,slink是print,对实时接收数据中的单词进行个数统计。看streaming作业会一直处于running运行中。利于nc启动socket server。Java版本:1.8.0_45。实时处理socket流数据。

官方文档地址:first steps | apache flink 

flink下载:https://archive.apache.org/dist/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.11.tgz

 

java环境:1.8.0_45

解压安装包:

[root@vm-9f-mysteel-dc-ebc-test03 opt]# tar -zxvf flink-1.14.4-bin-scala_2.11.tgz
flink-1.14.4/
flink-1.14.4/lib/
flink-1.14.4/lib/flink-dist_2.11-1.14.4.jar
flink-1.14.4/lib/flink-json-1.14.4.jar
flink-1.14.4/lib/flink-shaded-zookeeper-3.4.14.jar
flink-1.14.4/lib/flink-csv-1.14.4.jar
flink-1.14.4/lib/log4j-1.2-api-2.17.1.jar
flink-1.14.4/lib/flink-table_2.11-1.14.4.jar
flink-1.14.4/lib/log4j-slf4j-impl-2.17.1.jar
flink-1.14.4/lib/log4j-core-2.17.1.jar
flink-1.14.4/lib/log4j-api-2.17.1.jar
flink-1.14.4/readme.txt
flink-1.14.4/licenses/
flink-1.14.4/licenses/license.grizzled-slf4j
flink-1.14.4/licenses/license.javax.activation
flink-1.14.4/licenses/license.antlr-java-grammar-files
flink-1.14.4/licenses/license.reflections
flink-1.14.4/licenses/license.bouncycastle
........................

进入到bin目录,启动flink:

[root@vm-9f-mysteel-dc-ebc-test03 bin]# ./start-cluster.sh

访问web界面:http://192.168.201.143:8081

 执行以下命令构建flink项目:

d:\\\apache-maven-3.6.0\\\bin\\\mvn.cmd archetype:generate -darchetypegroupid=org.apache.flink -darchetypeartifactid=flink-quickstart-java  -darchetypeversion=1.7.2

构建完之后导入至idea,项目结构如下:

 编写实时处理类:

package com.fen;

import org.apache.flink.api.common.functions.flatmapfunction;
import org.apache.flink.api.java.tuple.tuple2;
import org.apache.flink.streaming.api.datastream.datastreamsource;
import org.apache.flink.streaming.api.datastream.singleoutputstreamoperator;
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;
import org.apache.flink.util.collector;

public class streamingjob {

	public static void main(string[] args) throws exception {
		//参数检查
		if (args.length != 2) {
			system.err.println("usage:\nsockettextstreamwordcount <hostname> <port>");
			return;
		}
		string hostname = args[0];
		integer port = integer.parseint(args[1]);

		final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
		datastreamsource<string> stream = env.sockettextstream(hostname, port);
		//计数
		singleoutputstreamoperator<tuple2<string, integer>> sum = stream.flatmap(new linesplitter())
				.keyby(0)
				.sum(1);
		sum.print();
		env.execute("java wordcount from sockettextstream example");
	}
	public static final class linesplitter implements flatmapfunction<string, tuple2<string, integer>> {
		@override
		public void flatmap(string s, collector<tuple2<string, integer>> collector) {
			string[] tokens = s.tolowercase().split("\\w+");

			for (string token: tokens) {
				if (token.length() > 0) {
					collector.collect(new tuple2<string, integer>(token, 1));
				}
			}
		}
	}
}

这个实时处理作业的source 是 scoket ,slink是print,对实时接收数据中的单词进行个数统计

打包:

 

 把jar包上传至安装了flink的服务器中:

 在linux中安装nc:

[root@vm-9f-mysteel-dc-ebc-test03 bin]# yum install nc

 利于nc启动socket server

[root@vm-9f-mysteel-dc-ebc-test03 ~]# nc -l 127.0.0.1  8888

 执行flink job:

[root@vm-9f-mysteel-dc-ebc-test03 bin]# ./flink run -c com.fen.streamingjob  ./com.fen-1.0-snapshot.jar  127.0.0.1 8888

 看streaming作业会一直处于running运行中

 再来看看怎么对实时数据进行统计的:

1、可以通过以下界面的形式进行取消:

 2、使用命令取消:

[root@vm-9f-mysteel-dc-ebc-test03 bin]# ./flink list
slf4j: class path contains multiple slf4j bindings.
slf4j: found binding in [jar:file:/opt/flink-1.7.2/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/staticloggerbinder.class]
slf4j: found binding in [jar:file:/apply/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/staticloggerbinder.class]
slf4j: see http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
slf4j: actual binding is of type [org.slf4j.impl.log4jloggerfactory]
waiting for response...
------------------ running/restarting jobs -------------------
13.12.2022 19:17:49 : c99f75881ce909ef625c8311e0f5e575 : java wordcount from sockettextstream example (running)
--------------------------------------------------------------
no scheduled jobs.
[root@vm-9f-mysteel-dc-ebc-test03 bin]# ./flink cancel -m 127.0.0.1:8081 c99f75881ce909ef625c8311e0f5e575
slf4j: class path contains multiple slf4j bindings.
slf4j: found binding in [jar:file:/opt/flink-1.7.2/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/staticloggerbinder.class]
slf4j: found binding in [jar:file:/apply/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/staticloggerbinder.class]
slf4j: see http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
slf4j: actual binding is of type [org.slf4j.impl.log4jloggerfactory]
cancelling job c99f75881ce909ef625c8311e0f5e575.
cancelled job c99f75881ce909ef625c8311e0f5e575.

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com