当前位置: 代码网 > 服务器>服务器>Linux > Flink入门,flink接入kafka数据源,消费数据并处理数据

Flink入门,flink接入kafka数据源,消费数据并处理数据

2024年08月04日 Linux 我要评论
构建时会提示找不到类,在idea中勾选如图选项,或者在pom文件中修改scope的值为compile。消费者可以收到消息,kafka安装完成,如果没有收到可以检查topic是否一致,ip是否正确。首先先下载kafka的linux版本,可以搜索阿里云的镜像进行下载,速度很快。将文件中的listeners注释去掉,并修改值为你虚拟机的ip,如下图。最后运行新的消费者,消费处理后的数据,只需修改topic。然后运行代码,在生产者生产一条数据进行查看。启动生产者服务,bin目录下。安装过程可自行搜索。

## 使用flink接收kafka数据,处理后发送到新topic

首先先下载kafka的linux版本,可以搜索阿里云的镜像进行下载,速度很快
http://mirrors.aliyun.com/apache/kafka/
安装过程可自行搜索。。。
注意,安装号kafka后需要修改配置文件

vi kafka的安装目录/config/server.properties

将文件中的listeners注释去掉,并修改值为你虚拟机的ip,如下图
192.168.80.108是我虚拟机的ip
然后启动kafka,这里要切换到bin目录下

sh kafka-server-start.sh -daemon ../config/server.properties

启动完可以切换到logs目录下查看日志
再打开两个shell页面,分别为生产和消费使用

vi server.log

启动生产者服务,bin目录下

./kafka-console-producer.sh --topic 自定义topic名称 --bootstrap-server 你虚拟机ip:9092

启动成功页面

启动消费者服务,bin目录下

./kafka-console-consumer.sh --topic 自定义topic名称 --bootstrap-server 你虚拟机ip:9092

启动成功页面
然后尝试在生成者输入字符,回车发送
在这里插入图片描述
在这里插入图片描述
消费者可以收到消息,kafka安装完成,如果没有收到可以检查topic是否一致,ip是否正确
下面引入flink相关依赖

<properties>
		<project.build.sourceencoding>utf-8</project.build.sourceencoding>
		<flink.version>1.19.0</flink.version>
		<target.java.version>1.8</target.java.version>
		<scala.binary.version>2.12</scala.binary.version>
		<maven.compiler.source>${target.java.version}</maven.compiler.source>
		<maven.compiler.target>${target.java.version}</maven.compiler.target>
		<log4j.version>2.17.1</log4j.version>
	</properties>

	<repositories>
		<repository>
			<id>apache.snapshots</id>
			<name>apache development snapshot repository</name>
			<url>https://repository.apache.org/content/repositories/snapshots/</url>
			<releases>
				<enabled>false</enabled>
			</releases>
			<snapshots>
				<enabled>true</enabled>
			</snapshots>
		</repository>
	</repositories>

	<dependencies>
		<!-- apache flink依赖项 -->
		<!-- 之所以提供这些依赖项,是因为它们不应该打包到jar文件中. -->
		<dependency>
            <!--table api + datastream-->
			<groupid>org.apache.flink</groupid>
			<artifactid>flink-table-api-java-bridge</artifactid>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupid>org.apache.flink</groupid>
			<artifactid>flink-clients</artifactid>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>

        <!--jdbc连接器-->
		<dependency>
			<groupid>org.apache.flink</groupid>
			<artifactid>flink-connector-jdbc</artifactid>
			<version>1.16.0</version>
		</dependency>

		<!-- 在这里添加连接器依赖项。它们必须在默认作用域(编译)中。 -->

		<dependency>
            <!--kafka连接器-->
			<groupid>org.apache.flink</groupid>
			<artifactid>flink-connector-kafka</artifactid>
			<version>1.16.0</version>
		</dependency>

		<!-- 添加日志框架,以便在ide中运行时生成控制台输出. -->
		<!-- 默认情况下,这些依赖项从应用程序jar中排除. -->
		<dependency>
			<groupid>org.apache.logging.log4j</groupid>
			<artifactid>log4j-slf4j-impl</artifactid>
			<version>${log4j.version}</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupid>org.apache.logging.log4j</groupid>
			<artifactid>log4j-api</artifactid>
			<version>${log4j.version}</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupid>org.apache.logging.log4j</groupid>
			<artifactid>log4j-core</artifactid>
			<version>${log4j.version}</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupid>org.projectlombok</groupid>
			<artifactid>lombok</artifactid>
			<version>1.18.30</version>
			<scope>compile</scope>
		</dependency>
	</dependencies>

构建时会提示找不到类,在idea中勾选如图选项,或者在pom文件中修改scope的值为compile
在这里插入图片描述
下来开始写代码

        final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
        //设置并行数
        env.setparallelism(4);
        // 每10000毫秒进行一次checkpoint
        env.enablecheckpointing(10000);
        env.setstreamtimecharacteristic(timecharacteristic.eventtime);
        
        // 设置 kafka 消费者属性
        properties properties = new properties();
        properties.setproperty(consumerconfig.bootstrap_servers_config, "ip");
        properties.setproperty(consumerconfig.group_id_config, "flink-consumer-group");
        properties.setproperty("key.deserializer", stringdeserializer.class.getname());
        properties.setproperty("value.deserializer", stringdeserializer.class.getname());
        
        // 创建kafka消费者,将消费者添加到流
        flinkkafkaconsumer<string> consumer = new flinkkafkaconsumer<>("你设定的topic", new simplestringschema(), properties);
        //设置只读取最新数据
        consumer.setstartfromlatest();

        //添加数据源
        datastreamsource<string> source = env.addsource(consumer);

        source.print();

        datastream<string> mappedstream = source.map(new mapfunction<string, string>() {
            @override
            public string map(string value) throws exception {
                return value.touppercase(); //进行数据治理 例如,将值转换为大写
            }
        });

        //创建一个flink生产者,将处理过的数据发回去
        mappedstream.addsink(new flinkkafkaproducer<>("新的topic", new simplestringschema(), properties));
        
        env.execute("flink kafka integration");

上面这种构建kafka数据源的方式官方显示已经过时,有另一种构建方式

         kafkasource<string> source = kafkasource.<string>builder()
                .settopics("test")      
                .setgroupid("test-consumer-group")
                .setbootstrapservers("ip:9092")
                .setstartingoffsets(offsetsinitializer.latest())   //消费最新数据
                .setvalueonlydeserializer(new simplestringschema()).build();
        datastream<string> datastream = env.fromsource(source, watermarkstrategy.nowatermarks(), "kafka source");

然后运行代码,在生产者生产一条数据进行查看
生产者在这里插入图片描述
查看代码控制台在这里插入图片描述
最后运行新的消费者,消费处理后的数据,只需修改topic
在这里插入图片描述
可以看到,flink英文变成了大写,简单接入完成

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com