现在假设一种需求,我方业务系统要与某服务平台通过kafka交互,异步获取服务,而系统架构可能老旧,不是spring cloud桶,不是spring boot,只是java普通项目或者 java web项目
依赖
<dependency>
<groupid>org.apache.kafka</groupid>
<artifactid>kafka-clients</artifactid>
<version>3.1.0</version>
</dependency>
<dependency>
<groupid>org.apache.kafka</groupid>
<artifactid>kafka_2.11</artifactid>
<version>2.4.1</version>
</dependency>kafka配置读取类
本文后边没用到,直接填配置了,简单点
但如果生产需要,还是有这个类比较好,可以从不同配置中读取,同时给个默认值
import org.apache.commons.lang3.stringutils;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import java.io.file;
import java.io.fileinputstream;
import java.io.ioexception;
import java.util.properties;
/**
* kafka配置读取类
*
* @author zzy
*/
public class kafkaproperties {
private static final logger log = loggerfactory.getlogger(kafkaproperties.class);
private static properties serverprops = new properties();
private static properties clientprops = new properties();
private static properties producerprops = new properties();
private static properties consumerprops = new properties();
private static kafkaproperties instance = null;
private kafkaproperties() {
string filepath = system.getproperty("user.dir") + file.separator
+ "kafkaconf" + file.separator;
file file;
fileinputstream fis = null;
try {
file = new file(filepath + "producer.properties");
if (file.exists()) {
fis = new fileinputstream(filepath + "producer.properties");
producerprops.load(fis);
}
file = new file(filepath + "consumer.properties");
if (file.exists()) {
fis = new fileinputstream(filepath + "consumer.properties");
consumerprops.load(fis);
}
file = new file(filepath + "server.properties");
if (file.exists()) {
fis = new fileinputstream(filepath + "server.properties");
serverprops.load(fis);
}
file = new file(filepath + "client.properties");
if (file.exists()) {
fis = new fileinputstream(filepath + "client.properties");
clientprops.load(fis);
}
} catch (exception e) {
log.error("init kafka props error." + e.getmessage());
} finally {
if (fis != null) {
try {
fis.close();
} catch (ioexception e) {
log.error("close kafka properties fis error." + e);
}
}
}
}
/**
* 获取懒汉式单例
*/
public static synchronized kafkaproperties getinstance() {
if (instance == null) {
instance = new kafkaproperties();
}
return instance;
}
/**
* 获取配置,获取不到时使用参数的默认配置
*/
public string getvalue(string key, string defaultvalue) {
string value;
if (stringutils.isempty(key)) {
log.error("key is null or empty");
}
value = getpropsvalue(key);
if (value == null) {
log.warn("kafka property getvalue return null, the key is " + key);
value = defaultvalue;
}
log.info("kafka property getvalue, key:" + key + ", value:" + value);
return value;
}
private string getpropsvalue(string key) {
string value = serverprops.getproperty(key);
if (value == null) {
value = producerprops.getproperty(key);
}
if (value == null) {
value = consumerprops.getproperty(key);
}
if (value == null) {
value = clientprops.getproperty(key);
}
return value;
}
}
producer
import org.apache.kafka.clients.producer.callback;
import org.apache.kafka.clients.producer.kafkaproducer;
import org.apache.kafka.clients.producer.producerrecord;
import org.apache.kafka.clients.producer.recordmetadata;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import java.util.properties;
import java.util.concurrent.executionexception;
/**
* kafka producer
* @author zzy
*/
public class kafkaproformal {
public static final logger log = loggerfactory.getlogger(kafkaproformal.class);
private properties properties = new properties();
private final string bootstrapservers = "bootstrap.servers";
private final string clientid = "client.id";
private final string keyserializer = "key.serializer";
private final string valueserializer = "value.serializer";
//private final string securityprotocol = "security.protocol";
//private final string saslkerberosservicename = "sasl.kerberos.service.name";
//private final string kerberosdomainname = "kerberos.domain.name";
private final string maxrequestsize = "max.request.size";
private kafkaproducer<string, string> producer;
private volatile static kafkaproformal kafkaproformal;
private kafkaproformal(string servers) {
properties.put(bootstrapservers, servers);
properties.put(keyserializer, "org.apache.kafka.common.serialization.stringserializer");
properties.put(valueserializer, "org.apache.kafka.common.serialization.stringserializer");
producer = new kafkaproducer<string, string>(properties);
}
public static kafkaproformal getinstance(string servers) {
if(kafkaproformal == null) {
synchronized(kafkaproformal.class) {
if(kafkaproformal == null) {
kafkaproformal = new kafkaproformal(servers);
}
}
}
return kafkaproformal;
}
public void sendstringwithcallback(string topic, string message, boolean asyncflag) {
producerrecord<string, string> record = new producerrecord<string, string>(topic, message);
long starttime = system.currenttimemillis();
if(asyncflag) {
//异步发送
producer.send(record, new kafkacallback(starttime, message));
} else {
//同步发送
try {
producer.send(record, new kafkacallback(starttime, message)).get();
} catch (interruptedexception e) {
log.error("interruptedexception occured : {0}", e);
} catch (executionexception e) {
log.error("executionexception occured : {0}", e);
}
}
}
}
class kafkacallback implements callback {
private static logger log = loggerfactory.getlogger(kafkacallback.class);
private string key;
private long starttime;
private string message;
kafkacallback(long starttime, string message) {
this.starttime = starttime;
this.message = message;
}
@override
public void oncompletion(recordmetadata metadata, exception exception) {
long elapsedtime = system.currenttimemillis() - starttime;
if(metadata != null) {
log.info("record(" + key + "," + message + ") sent to partition(" + metadata.partition()
+ "), offset(" + metadata.offset() + ") in " + elapsedtime + " ms.");
} else {
log.error("metadata is null." + "record(" + key + "," + message + ")", exception);
}
}
}consumer
import kafka.utils.shutdownablethread;
import org.apache.kafka.clients.consumer.consumerrecords;
import org.apache.kafka.clients.consumer.kafkaconsumer;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import java.time.duration;
import java.util.properties;
import java.util.set;
/**
* kafka consumer
* @author zzy
*/
public abstract class kafkaconformal extends shutdownablethread {
private static final logger log = loggerfactory.getlogger(kafkaconformal.class);
private set<string> topics;
private final string bootstrapservers = "bootstrap.servers";
private final string groupid = "group.id";
private final string keydeserializer = "key.deserializer";
private final string valuedeserializer = "value.deserializer";
private final string enableautocommit = "enable.auto.commit";
private final string autocommitintervalms = "auto.commit.interval.ms";
private final string sessiontimeoutms = "session.timeout.ms";
private kafkaconsumer<string, string> consumer;
public kafkaconformal(string topic) {
super("kafkaconsumerexample", false);
topics.add(topic);
properties props = new properties();
props.put(bootstrapservers, "your servers");
props.put(groupid, "testgroup");
props.put(enableautocommit, "true");
props.put(autocommitintervalms, "1000");
props.put(sessiontimeoutms, "30000");
props.put(keydeserializer, "org.apache.kafka.common.serialization.stringdeserializer");
props.put(valuedeserializer, "org.apache.kafka.common.serialization.stringdeserializer");
consumer = new kafkaconsumer<>(props);
}
/**
* subscribe and handle the msg
*/
@override
public void dowork() {
consumer.subscribe(topics);
consumerrecords<string, string> records = consumer.poll(duration.ofseconds(1));
dealrecords(records);
}
/**
* 实例化consumer时,进行对消费信息的处理
* @param records records
*/
public abstract void dealrecords(consumerrecords<string, string> records);
public void settopics(set<string> topics) {
this.topics = topics;
}
}
使用
kafkaproformal producer = kafkaproformal.getinstance("kafka server1.1.1.1:9092,2.2.2.2:9092");
kafkaconformal consumer = new kafkaconformal("consume_topic") {
@override
public void dealrecords(consumerrecords<string, string> records) {
for (consumerrecord<string, string> record: records) {
producer.sendstringwithcallback("target_topic", record.value(), true);
}
}
};
consumer.start();总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论