kafka整合webflux
1、引入依赖
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-webflux</artifactid>
</dependency>
<dependency>
<groupid>io.projectreactor.kafka</groupid>
<artifactid>reactor-kafka</artifactid>
<version>1.1.0.release</version>
</dependency>2、代码示例
@component
public class kafkaservice {
private static final objectmapper object_mapper = new objectmapper();
private kafkasender<string, string> kafkasender;
private kafkareceiver<string, string> kafkareceiver;
@postconstruct
public void init() {
final map<string, object> producerprops = new hashmap<>();
producerprops.put(producerconfig.key_serializer_class_config, integerserializer.class);
producerprops.put(producerconfig.value_serializer_class_config, stringserializer.class);
producerprops.put(producerconfig.bootstrap_servers_config, "localhost:9092");
final senderoptions<string, string> produceroptions = senderoptions.create(producerprops);
this.kafkasender = kafkasender.create(produceroptions);
final map<string, object> consumerprops = new hashmap<>();
consumerprops.put(consumerconfig.key_deserializer_class_config, integerdeserializer.class);
consumerprops.put(consumerconfig.value_deserializer_class_config, stringdeserializer.class);
consumerprops.put(consumerconfig.client_id_config, "payment-validator-1");
consumerprops.put(consumerconfig.group_id_config, "payment-validator");
consumerprops.put(consumerconfig.bootstrap_servers_config, "localhost:9092");
receiveroptions<string, string> consumeroptions = receiveroptions.<string, string>create(consumerprops)
.subscription(collections.singleton("demo"))
.addassignlistener(partitions -> system.out.println("onpartitionsassigned " + partitions))
.addrevokelistener(partitions -> system.out.println("onpartitionsrevoked " + partitions));
kafkareceiver<string, string> kafkareceiver = kafkareceiver.create(consumeroptions);
kafkareceiver.receive().doonnext(r -> {
system.out.println(r.value());
r.receiveroffset().acknowledge();
}).subscribe();
this.kafkareceiver = kafkareceiver;
}
public mono< ?> send() {
senderrecord<string, string, object> senderrecord = senderrecord.create(new producerrecord<>("demo", value()), 1);
return kafkasender.send(mono.just(senderrecord)).next();
}
private string value() {
map<string, string> map = new hashmap<>();
map.put("name", uuid.randomuuid().tostring());
try {
return object_mapper.writevalueasstring(map);
} catch (jsonprocessingexception e) {
return "{}";
}
}
}3、其它
server:
port: 8888
spring:
jackson:
serialization:
fail_on_empty_beans: false总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论