当前位置: 代码网 > it编程>编程语言>Java > Java版Flink使用指南——定制RabbitMQ数据源的序列化器

Java版Flink使用指南——定制RabbitMQ数据源的序列化器

2024年08月01日 Java 我要评论
在一文中,我们从RabbitMQ队列中读取了字符串型数据。如果我们希望读取的数据被自动化转换为一个对象,则需要定制序列化器。本文我们就将讲解数据源序列化器的定制方法。

《java版flink使用指南——从rabbitmq中队列中接入消息流》一文中,我们从rabbitmq队列中读取了字符串型数据。如果我们希望读取的数据被自动化转换为一个对象,则需要定制序列化器。本文我们就将讲解数据源序列化器的定制方法。

新建工程

我们在intellij中新建一个工程sourceserializer。
archetype填入:org.apache.flink:flink-quickstart-java
版本填入与flink的版本:1.19.1
在这里插入图片描述

新增依赖

在pom.xml中新增rabbitmq连接器

		<dependency>
			<groupid>org.apache.flink</groupid>
			<artifactid>flink-connector-rabbitmq</artifactid>
			<version>3.0.1-1.17</version>
		</dependency>

新增json库依赖

		<dependency>
			<groupid>com.fasterxml.jackson.core</groupid>
			<artifactid>jackson-core</artifactid>
			<version>2.17.1</version>
		</dependency>

新增lombok库,主要是为了使用它的一些注解

        <dependency>
            <groupid>org.projectlombok</groupid>
            <artifactid>lombok</artifactid>
            <version>1.18.32</version>
            <scope>provided</scope>
        </dependency>

数据对象

我们新建一个简单的数据对象sampledata
src/main/java/org/example/vo/sampledata.java

package org.example.vo;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.objectmapper;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.jsonprocessingexception;

import lombok.allargsconstructor;
import lombok.data;
import lombok.noargsconstructor;

@data
@noargsconstructor
@allargsconstructor
public class sampledata {
    private long id;
    private string name;
    private int age;
    private boolean married;
    private double salary;

    public string tojson() throws jsonprocessingexception {
        objectmapper mapper = new objectmapper();
        return mapper.writevalueasstring(this);
    }

    public static sampledata fromjson(string json) throws jsonprocessingexception {
        objectmapper mapper = new objectmapper();
        return mapper.readvalue(json, sampledata.class);
    }
}

这个方法包含两个方法,一个是将sampledata 转换成字符串,另一个是将字符串转成sampledata 对象。

序列化器

我们定义的数据源序列化器要实现abstractdeserializationschema接口,主要是通过deserialize方法将二进制数组转换成sampledata 对象。

src/main/java/org/example/serializer/sampledatarabbitmqsourceserializer.java

package org.example.serializer;

import org.apache.flink.api.common.serialization.abstractdeserializationschema;
import org.apache.flink.api.common.typeinfo.typeinformation;
import org.example.vo.sampledata;

import java.io.ioexception;

public class sampledatarabbitmqsourceserializer extends abstractdeserializationschema<sampledata> {
    @override
    public sampledata deserialize(byte[] message) throws ioexception {
        return sampledata.fromjson(new string(message));
    }

    @override
    public boolean isendofstream(sampledata nextelement) {
        return false;
    }

    @override
    public typeinformation<sampledata> getproducedtype() {
        return typeinformation.of(sampledata.class);
    }
}

接入数据源

我们在《java版flink使用指南——定制rabbitmq的sink序列化器》一文中,往data.to.rbtmq对了写入了大量sampledata 数据。这次我们将其作为数据源来做测试
这次我们在创建rmqsource时传入序列化器sampledatarabbitmqsourceserializer。它会将从rabbitmq获取的数据转换成sampledata对象。
然后我们获取所有“已婚”(filter.getmarried() == true)的数据,将其打印到日志中。

		string queuename = "data.to.rbtmq";
		string host = "172.21.112.140"; // ip of the rabbitmq server
		int port = 5672;
		string username = "admin";
		string password = "fangliang";
		string virtualhost = "/";
		int parallelism = 1;

		// create a rabbitmq source
		rmqconnectionconfig rmqconnectionconfig = new rmqconnectionconfig.builder()
				.sethost(host)
				.setport(port)
				.setusername(username)
				.setpassword(password)
				.setvirtualhost(virtualhost)
				.build();

		rmqsource<sampledata> rmqsource = new rmqsource<>(rmqconnectionconfig, queuename, true, new sampledatarabbitmqsourceserializer());
		final datastream<sampledata> stream = env.addsource(rmqsource).name(username + "'s source from " + queuename).setparallelism(parallelism);

		stream.filter(filter -> filter.getmarried() == true).print().name(username + "'s sink to stdout").setparallelism(parallelism);

测试

修改slot个数

由于我们要运行两个流式计算任务,于是需要两个slot。

vim conf/config.yaml 

将numberoftaskslots的值改成2。

打包、提交、运行

我们将本例和《java版flink使用指南——定制rabbitmq的sink序列化器》中的包都提交运行
在这里插入图片描述
然后在日志中可以看到“已婚”的数据都在输出

 tail -f log/*

在这里插入图片描述

工程代码

https://github.com/f304646673/flinkdemo

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com