现在假设一种需求,我方业务系统要与某服务平台通过kafka交互,异步获取服务,而系统架构可能老旧,不是spring cloud桶,不是spring boot,只是java普通项目或者 java web项目
依赖
<dependency> <groupid>org.apache.kafka</groupid> <artifactid>kafka-clients</artifactid> <version>3.1.0</version> </dependency> <dependency> <groupid>org.apache.kafka</groupid> <artifactid>kafka_2.11</artifactid> <version>2.4.1</version> </dependency>
kafka配置读取类
本文后边没用到,直接填配置了,简单点
但如果生产需要,还是有这个类比较好,可以从不同配置中读取,同时给个默认值
import org.apache.commons.lang3.stringutils; import org.slf4j.logger; import org.slf4j.loggerfactory; import java.io.file; import java.io.fileinputstream; import java.io.ioexception; import java.util.properties; /** * kafka配置读取类 * * @author zzy */ public class kafkaproperties { private static final logger log = loggerfactory.getlogger(kafkaproperties.class); private static properties serverprops = new properties(); private static properties clientprops = new properties(); private static properties producerprops = new properties(); private static properties consumerprops = new properties(); private static kafkaproperties instance = null; private kafkaproperties() { string filepath = system.getproperty("user.dir") + file.separator + "kafkaconf" + file.separator; file file; fileinputstream fis = null; try { file = new file(filepath + "producer.properties"); if (file.exists()) { fis = new fileinputstream(filepath + "producer.properties"); producerprops.load(fis); } file = new file(filepath + "consumer.properties"); if (file.exists()) { fis = new fileinputstream(filepath + "consumer.properties"); consumerprops.load(fis); } file = new file(filepath + "server.properties"); if (file.exists()) { fis = new fileinputstream(filepath + "server.properties"); serverprops.load(fis); } file = new file(filepath + "client.properties"); if (file.exists()) { fis = new fileinputstream(filepath + "client.properties"); clientprops.load(fis); } } catch (exception e) { log.error("init kafka props error." + e.getmessage()); } finally { if (fis != null) { try { fis.close(); } catch (ioexception e) { log.error("close kafka properties fis error." + e); } } } } /** * 获取懒汉式单例 */ public static synchronized kafkaproperties getinstance() { if (instance == null) { instance = new kafkaproperties(); } return instance; } /** * 获取配置,获取不到时使用参数的默认配置 */ public string getvalue(string key, string defaultvalue) { string value; if (stringutils.isempty(key)) { log.error("key is null or empty"); } value = getpropsvalue(key); if (value == null) { log.warn("kafka property getvalue return null, the key is " + key); value = defaultvalue; } log.info("kafka property getvalue, key:" + key + ", value:" + value); return value; } private string getpropsvalue(string key) { string value = serverprops.getproperty(key); if (value == null) { value = producerprops.getproperty(key); } if (value == null) { value = consumerprops.getproperty(key); } if (value == null) { value = clientprops.getproperty(key); } return value; } }
producer
import org.apache.kafka.clients.producer.callback; import org.apache.kafka.clients.producer.kafkaproducer; import org.apache.kafka.clients.producer.producerrecord; import org.apache.kafka.clients.producer.recordmetadata; import org.slf4j.logger; import org.slf4j.loggerfactory; import java.util.properties; import java.util.concurrent.executionexception; /** * kafka producer * @author zzy */ public class kafkaproformal { public static final logger log = loggerfactory.getlogger(kafkaproformal.class); private properties properties = new properties(); private final string bootstrapservers = "bootstrap.servers"; private final string clientid = "client.id"; private final string keyserializer = "key.serializer"; private final string valueserializer = "value.serializer"; //private final string securityprotocol = "security.protocol"; //private final string saslkerberosservicename = "sasl.kerberos.service.name"; //private final string kerberosdomainname = "kerberos.domain.name"; private final string maxrequestsize = "max.request.size"; private kafkaproducer<string, string> producer; private volatile static kafkaproformal kafkaproformal; private kafkaproformal(string servers) { properties.put(bootstrapservers, servers); properties.put(keyserializer, "org.apache.kafka.common.serialization.stringserializer"); properties.put(valueserializer, "org.apache.kafka.common.serialization.stringserializer"); producer = new kafkaproducer<string, string>(properties); } public static kafkaproformal getinstance(string servers) { if(kafkaproformal == null) { synchronized(kafkaproformal.class) { if(kafkaproformal == null) { kafkaproformal = new kafkaproformal(servers); } } } return kafkaproformal; } public void sendstringwithcallback(string topic, string message, boolean asyncflag) { producerrecord<string, string> record = new producerrecord<string, string>(topic, message); long starttime = system.currenttimemillis(); if(asyncflag) { //异步发送 producer.send(record, new kafkacallback(starttime, message)); } else { //同步发送 try { producer.send(record, new kafkacallback(starttime, message)).get(); } catch (interruptedexception e) { log.error("interruptedexception occured : {0}", e); } catch (executionexception e) { log.error("executionexception occured : {0}", e); } } } } class kafkacallback implements callback { private static logger log = loggerfactory.getlogger(kafkacallback.class); private string key; private long starttime; private string message; kafkacallback(long starttime, string message) { this.starttime = starttime; this.message = message; } @override public void oncompletion(recordmetadata metadata, exception exception) { long elapsedtime = system.currenttimemillis() - starttime; if(metadata != null) { log.info("record(" + key + "," + message + ") sent to partition(" + metadata.partition() + "), offset(" + metadata.offset() + ") in " + elapsedtime + " ms."); } else { log.error("metadata is null." + "record(" + key + "," + message + ")", exception); } } }
consumer
import kafka.utils.shutdownablethread; import org.apache.kafka.clients.consumer.consumerrecords; import org.apache.kafka.clients.consumer.kafkaconsumer; import org.slf4j.logger; import org.slf4j.loggerfactory; import java.time.duration; import java.util.properties; import java.util.set; /** * kafka consumer * @author zzy */ public abstract class kafkaconformal extends shutdownablethread { private static final logger log = loggerfactory.getlogger(kafkaconformal.class); private set<string> topics; private final string bootstrapservers = "bootstrap.servers"; private final string groupid = "group.id"; private final string keydeserializer = "key.deserializer"; private final string valuedeserializer = "value.deserializer"; private final string enableautocommit = "enable.auto.commit"; private final string autocommitintervalms = "auto.commit.interval.ms"; private final string sessiontimeoutms = "session.timeout.ms"; private kafkaconsumer<string, string> consumer; public kafkaconformal(string topic) { super("kafkaconsumerexample", false); topics.add(topic); properties props = new properties(); props.put(bootstrapservers, "your servers"); props.put(groupid, "testgroup"); props.put(enableautocommit, "true"); props.put(autocommitintervalms, "1000"); props.put(sessiontimeoutms, "30000"); props.put(keydeserializer, "org.apache.kafka.common.serialization.stringdeserializer"); props.put(valuedeserializer, "org.apache.kafka.common.serialization.stringdeserializer"); consumer = new kafkaconsumer<>(props); } /** * subscribe and handle the msg */ @override public void dowork() { consumer.subscribe(topics); consumerrecords<string, string> records = consumer.poll(duration.ofseconds(1)); dealrecords(records); } /** * 实例化consumer时,进行对消费信息的处理 * @param records records */ public abstract void dealrecords(consumerrecords<string, string> records); public void settopics(set<string> topics) { this.topics = topics; } }
使用
kafkaproformal producer = kafkaproformal.getinstance("kafka server1.1.1.1:9092,2.2.2.2:9092"); kafkaconformal consumer = new kafkaconformal("consume_topic") { @override public void dealrecords(consumerrecords<string, string> records) { for (consumerrecord<string, string> record: records) { producer.sendstringwithcallback("target_topic", record.value(), true); } } }; consumer.start();
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论