当前位置: 代码网 > it编程>编程语言>Asp.net > Kafka整合WebFlux实践

Kafka整合WebFlux实践

2026年03月06日 Asp.net 我要评论
kafka整合webflux1、引入依赖<dependency> <groupid>org.springframework.boot</groupid>

kafka整合webflux

1、引入依赖

<dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-webflux</artifactid>
</dependency>
<dependency>
    <groupid>io.projectreactor.kafka</groupid>
    <artifactid>reactor-kafka</artifactid>
    <version>1.1.0.release</version>
</dependency>

2、代码示例

@component
public class kafkaservice {

    private static final objectmapper object_mapper = new objectmapper();

    private kafkasender<string, string> kafkasender;
    private kafkareceiver<string, string> kafkareceiver;

    @postconstruct
    public void init() {
        final map<string, object> producerprops = new hashmap<>();
        producerprops.put(producerconfig.key_serializer_class_config, integerserializer.class);
        producerprops.put(producerconfig.value_serializer_class_config, stringserializer.class);
        producerprops.put(producerconfig.bootstrap_servers_config, "localhost:9092");
        final senderoptions<string, string> produceroptions = senderoptions.create(producerprops);
        this.kafkasender = kafkasender.create(produceroptions);

        final map<string, object> consumerprops = new hashmap<>();
        consumerprops.put(consumerconfig.key_deserializer_class_config, integerdeserializer.class);
        consumerprops.put(consumerconfig.value_deserializer_class_config, stringdeserializer.class);
        consumerprops.put(consumerconfig.client_id_config, "payment-validator-1");
        consumerprops.put(consumerconfig.group_id_config, "payment-validator");
        consumerprops.put(consumerconfig.bootstrap_servers_config, "localhost:9092");
        receiveroptions<string, string> consumeroptions = receiveroptions.<string, string>create(consumerprops)
                .subscription(collections.singleton("demo"))
                .addassignlistener(partitions -> system.out.println("onpartitionsassigned " + partitions))
                .addrevokelistener(partitions -> system.out.println("onpartitionsrevoked " + partitions));
        kafkareceiver<string, string> kafkareceiver = kafkareceiver.create(consumeroptions);
        kafkareceiver.receive().doonnext(r -> {
            system.out.println(r.value());
            r.receiveroffset().acknowledge();
        }).subscribe();
        this.kafkareceiver = kafkareceiver;
    }

    public mono< ?> send() {
        senderrecord<string, string, object> senderrecord = senderrecord.create(new producerrecord<>("demo", value()), 1);
        return kafkasender.send(mono.just(senderrecord)).next();
    }

    private string value() {
        map<string, string> map = new hashmap<>();
        map.put("name", uuid.randomuuid().tostring());
        try {
            return object_mapper.writevalueasstring(map);
        } catch (jsonprocessingexception e) {
            return "{}";
        }
    }
}

3、其它

server:
  port: 8888

spring:
  jackson:
    serialization:
      fail_on_empty_beans: false

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com