【需求】:生产者发送数据至 kafka 序列化使用 avro,消费者通过 avro 进行反序列化,并将数据通过 mybatisplus 存入数据库。
一、环境介绍
【1】apache avro 1.8;【2】spring kafka 1.2;【3】spring boot 1.5;【4】maven 3.5;
<?xml version="1.0" encoding="utf-8"?> <project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelversion>4.0.0</modelversion> <groupid>com.codenotfound</groupid> <artifactid>spring-kafka-avro</artifactid> <version>0.0.1-snapshot</version> <name>spring-kafka-avro</name> <description>spring kafka - apache avro serializer deserializer example</description> <url>https://www.codenotfound.com/spring-kafka-apache-avro-serializer-deserializer-example.html</url> <parent> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-parent</artifactid> <version>1.5.4.release</version> </parent> <properties> <java.version>1.8</java.version> <spring-kafka.version>1.2.2.release</spring-kafka.version> <avro.version>1.8.2</avro.version> </properties> <dependencies> <!-- spring-boot --> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> </dependency> <!-- spring-kafka --> <dependency> <groupid>org.springframework.kafka</groupid> <artifactid>spring-kafka</artifactid> <version>${spring-kafka.version}</version> </dependency> <dependency> <groupid>org.springframework.kafka</groupid> <artifactid>spring-kafka-test</artifactid> <version>${spring-kafka.version}</version> <scope>test</scope> </dependency> <!-- avro --> <dependency> <groupid>org.apache.avro</groupid> <artifactid>avro</artifactid> <version>${avro.version}</version> </dependency> </dependencies> <build> <plugins> <!-- spring-boot-maven-plugin --> <plugin> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-maven-plugin</artifactid> </plugin> <!-- avro-maven-plugin --> <plugin> <groupid>org.apache.avro</groupid> <artifactid>avro-maven-plugin</artifactid> <version>${avro.version}</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourcedirectory>${project.basedir}/src/main/resources/avro/</sourcedirectory> <outputdirectory>${project.build.directory}/generated/avro</outputdirectory> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
二、avro 文件
【1】avro 依赖于由使用json定义的原始类型组成的架构。对于此示例,我们将使用apache avro入门指南中的“用户”模式,如下所示。该模式存储在src / main / resources / avro下的 user.avsc文件中。我这里使用的是 electronicspackage.avsc。namespace 指定你生成 java 类时指定的 package 路径,name 表时生成的文件。
{"namespace": "com.yd.cyber.protocol.avro", "type": "record", "name": "electronicspackage", "fields": [ {"name":"package_number","type":["string","null"],"default": null}, {"name":"frs_site_code","type":["string","null"],"default": null}, {"name":"frs_site_code_type","type":["string","null"],"default":null}, {"name":"end_allocate_code","type":["string","null"],"default": null}, {"name":"code_1","type":["string","null"],"default": null}, {"name":"aggregat_package_code","type":["string","null"],"default": null} ] }
【2】avro附带了代码生成功能,该代码生成功能使我们可以根据上面定义的“用户”模式自动创建java类。一旦生成了相关的类,就无需直接在程序中使用架构。这些类可以使用 avro-tools.jar 或项目是maven 项目,调用 maven projects 进行 compile 自动生成 electronicspackage.java 文件:如下是通过 maven 的方式
【3】这将导致生成一个 electronicspackage.java 类,该类包含架构和许多 builder构造 electronicspackage对象的方法。
三、为 kafka 主题生成 avro消息
kafka byte 在其主题中存储和传输数组。但是,当我们使用 avro对象时,我们需要在这些 byte数组之间进行转换。在0.9.0.0版之前,kafka java api使用 encoder/ decoder接口的实现来处理转换,但是在新api中,这些已经被 serializer/ deserializer接口实现代替。kafka附带了许多 内置(反)序列化器,但不包括avro。为了解决这个问题,我们将创建一个 avroserializer类,该类serializer专门为 avro对象实现接口。然后,我们实现将 serialize() 主题名称和数据对象作为输入的方法,在本例中,该对象是扩展的 avro对象 specificrecordbase。该方法将avro对象序列化为字节数组并返回结果。这个类属于通用类,一次配置多次使用。
package com.yd.cyber.web.avro; import java.io.bytearrayoutputstream; import java.io.ioexception; import java.util.map; import org.apache.avro.io.binaryencoder; import org.apache.avro.io.datumwriter; import org.apache.avro.io.encoderfactory; import org.apache.avro.specific.specificdatumwriter; import org.apache.avro.specific.specificrecordbase; import org.apache.kafka.common.errors.serializationexception; import org.apache.kafka.common.serialization.serializer; /** * avro序列化类 * @author zzx * @creat 2020-03-11-19:17 */ public class avroserializer<t extends specificrecordbase> implements serializer<t> { @override public void close() {} @override public void configure(map<string, ?> arg0, boolean arg1) {} @override public byte[] serialize(string topic, t data) { if(data == null) { return null; } datumwriter<t> writer = new specificdatumwriter<>(data.getschema()); bytearrayoutputstream bytearrayoutputstream = new bytearrayoutputstream(); binaryencoder binaryencoder = encoderfactory.get().directbinaryencoder(bytearrayoutputstream , null); try { writer.write(data, binaryencoder); binaryencoder.flush(); bytearrayoutputstream.close(); }catch (ioexception e) { throw new serializationexception(e.getmessage()); } return bytearrayoutputstream.tobytearray(); } }
四、avroconfig 配置类
avro 配置信息在 avroconfig 配置类中,现在,我们需要更改,avroconfig 开始使用我们的自定义 serializer实现。这是通过将“ value_serializer_class_config”属性设置为 avroserializer该类来完成的。此外,我们更改了producerfactory 和kafkatemplate 通用类型,使其指定 electronicspackage 而不是 string。当我们有多个序列化的时候,这个配置文件需要多次需求,添加自己需要序列化的对象。
package com.yd.cyber.web.avro; /** * @author zzx * @creat 2020-03-11-20:23 */ @configuration @enablekafka public class avroconfig { @value("${spring.kafka.bootstrap-servers}") private string bootstrapservers; @value("${spring.kafka.producer.max-request-size}") private string maxrequestsize; @bean public map<string, object> avroproducerconfigs() { map<string, object> props = new hashmap<>(); props.put(producerconfig.bootstrap_servers_config, bootstrapservers); props.put(producerconfig.max_request_size_config, maxrequestsize); props.put(producerconfig.key_serializer_class_config, stringserializer.class); props.put(producerconfig.value_serializer_class_config, avroserializer.class); return props; } @bean public producerfactory<string, electronicspackage> elproducerfactory() { return new defaultkafkaproducerfactory<>(avroproducerconfigs()); } @bean public kafkatemplate<string, electronicspackage> elkafkatemplate() { return new kafkatemplate<>(elproducerfactory()); } }
五、通过 kafkatemplate 发送消息
最后就是通过 controller类调用 kafkatemplate 的 send 方法接受一个avro electronicspackage对象作为输入。请注意,我们还更新了 kafkatemplate 泛型类型。
package com.yd.cyber.web.controller.aggregation; import com.yd.cyber.protocol.avro.electronicspackage; import com.yd.cyber.web.vo.electronicspackagevo; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.beans.beanutils; import org.springframework.kafka.core.kafkatemplate; import org.springframework.web.bind.annotation.getmapping; import org.springframework.web.bind.annotation.requestmapping; import org.springframework.web.bind.annotation.restcontroller; import javax.annotation.resource; /** * <p> * innodb free: 4096 kb 前端控制器 * </p> * * @author zzx * @since 2020-04-19 */ @restcontroller @requestmapping("/electronicspackagetbl") public class electronicspackagecontroller { //日誌 private static final logger log = loggerfactory.getlogger(electronicspackagecontroller.class); @resource private kafkatemplate<string,electronicspackage> kafkatemplate; @getmapping("/push") public void push(){ electronicspackagevo electronicspackagevo = new electronicspackagevo(); electronicspackagevo.setelectid(9); electronicspackagevo.setaggregatpackagecode("9"); electronicspackagevo.setcode1("9"); electronicspackagevo.setendallocatecode("9"); electronicspackagevo.setfrssitecodetype("9"); electronicspackagevo.setfrssitecode("9"); electronicspackagevo.setpackagenumber("9"); electronicspackage electronicspackage = new electronicspackage(); beanutils.copyproperties(electronicspackagevo,electronicspackage); //发送消息 kafkatemplate.send("electronics_package",electronicspackage); log.info("electronics_package topic 发送成功"); } }
六、从 kafka主题消费 avro消息反序列化
收到的消息需要反序列化为 avro格式。为此,我们创建一个 avrodeserializer 实现该 deserializer接口的类。该 deserialize()方法将主题名称和byte数组作为输入,然后将其解码回avro对象。从 targettype类参数中检索需要用于解码的模式,该类参数需要作为参数传递给 avrodeserializer构造函数。
package com.yd.cyber.web.avro; import java.io.bytearrayinputstream; import java.io.ioexception; import java.util.arrays; import java.util.map; import org.apache.avro.generic.genericrecord; import org.apache.avro.io.binarydecoder; import org.apache.avro.io.datumreader; import org.apache.avro.io.decoderfactory; import org.apache.avro.specific.specificdatumreader; import org.apache.avro.specific.specificrecordbase; import org.apache.kafka.common.errors.serializationexception; import org.apache.kafka.common.serialization.deserializer; import org.slf4j.logger; import org.slf4j.loggerfactory; import javax.xml.bind.datatypeconverter; /** * avro反序列化 * @author fuyx * @creat 2020-03-12-15:19 */ public class avrodeserializer<t extends specificrecordbase> implements deserializer<t> { //日志系统 private static final logger logger = loggerfactory.getlogger(avrodeserializer.class); protected final class<t> targettype; public avrodeserializer(class<t> targettype) { this.targettype = targettype; } @override public void close() {} @override public void configure(map<string, ?> arg0, boolean arg1) {} @override public t deserialize(string topic, byte[] data) { try { t result = null; if(data == null) { return null; } logger.debug("data='{}'", datatypeconverter.printhexbinary(data)); bytearrayinputstream in = new bytearrayinputstream(data); datumreader<genericrecord> userdatumreader = new specificdatumreader<>(targettype.newinstance().getschema()); binarydecoder decoder = decoderfactory.get().directbinarydecoder(in, null); result = (t) userdatumreader.read(null, decoder); logger.debug("deserialized data='{}'", result); return result; } catch (exception ex) { throw new serializationexception( "can't deserialize data '" + arrays.tostring(data) + "' from topic '" + topic + "'", ex); } finally { } } }
七、反序列化的配置类
我将反序列化的配置和序列化的配置都放置在 avroconfig 配置类中。在 avroconfig 需要被这样更新了avrodeserializer用作值“value_deserializer_class_config”属性。我们还更改了 consumerfactory 和 concurrentkafkalistenercontainerfactory通用类型,以使其指定 electronicspackage 而不是 string。将 defaultkafkaconsumerfactory 通过1个新的创造 avrodeserializer 是需要 “user.class”作为构造函数的参数。需要使用class<?> targettype,avrodeserializer 以将消费 byte[]对象反序列化为适当的目标对象(在此示例中为 electronicspackage 类)。
@configuration @enablekafka public class avroconfig { @value("${spring.kafka.bootstrap-servers}") private string bootstrapservers; @value("${spring.kafka.producer.max-request-size}") private string maxrequestsize; @bean public map<string, object> consumerconfigs() { map<string, object> props = new hashmap<>(); props.put(consumerconfig.bootstrap_servers_config, bootstrapservers); props.put(consumerconfig.key_deserializer_class_config, stringdeserializer.class); props.put(consumerconfig.value_deserializer_class_config, avrodeserializer.class); props.put(consumerconfig.group_id_config, "avro"); return props; } @bean public consumerfactory<string, electronicspackage> consumerfactory() { return new defaultkafkaconsumerfactory<>(consumerconfigs(), new stringdeserializer(), new avrodeserializer<>(electronicspackage.class)); } @bean public concurrentkafkalistenercontainerfactory<string, electronicspackage> kafkalistenercontainerfactory() { concurrentkafkalistenercontainerfactory<string, electronicspackage> factory = new concurrentkafkalistenercontainerfactory<>(); factory.setconsumerfactory(consumerfactory()); return factory; } }
八、消费者消费消息
消费者通过 @kafkalistener 监听对应的 topic ,这里需要注意的是,网上直接获取对象的参数传的是对象,比如这里可能需要传入 electronicspackage 类,但是我这样写的时候,error日志总说是返回序列化的问题,所以我使用 genericrecord 对象接收,也就是我反序列化中定义的对象,是没有问题的。然后我将接收到的消息通过 mybatisplus 存入到数据库。
package com.zzx.cyber.web.controller.datasource.intercompany; import com.zzx.cyber.web.service.electronicspackageservice; import com.zzx.cyber.web.vo.electronicspackagevo; import org.apache.avro.generic.genericrecord; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.beans.beanutils; import org.springframework.kafka.annotation.kafkalistener; import org.springframework.stereotype.controller; import javax.annotation.resource; /** * @desc: * @author: zzx * @creatdate 2020/4/1912:21 */ @controller public class electronicspackageconsumercontroller { //日志 private static final logger log = loggerfactory.getlogger(electronicspackageconsumercontroller.class); //服务层 @resource private electronicspackageservice electronicspackageservice; /** * 扫描数据测试 * @param genericrecordne */ @kafkalistener(topics = {"electronics_package"}) public void receive(genericrecord genericrecordne) throws exception { log.info("数据接收:electronicspackage + "+ genericrecordne.tostring()); //业务处理类,mybatispuls 自动生成的类 electronicspackagevo electronicspackagevo = new electronicspackagevo(); //将收的数据复制过来 beanutils.copyproperties(genericrecordne,electronicspackagevo); try { //落库 log.info("数据入库"); electronicspackageservice.save(electronicspackagevo); } catch (exception e) { throw new exception("插入异常"+e); } } }
到此这篇关于springboot 整合 avro 与 kafka的文章就介绍到这了,更多相关springboot 整合 avro 与 kafka内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论