当前位置: 代码网 > it编程>编程语言>C/C++ > kafka自定义分区器使用详解

kafka自定义分区器使用详解

2025年11月19日 C/C++ 我要评论
kafka自定义分区器根据企业需求,自己重新实现分区器只需要定义类实现partitioner接口,然后重写partition()方法即可假设现在有一个需求发送过来的数据中如果包含cuihaida,就发

kafka自定义分区器

根据企业需求,自己重新实现分区器

只需要定义类实现partitioner接口,然后重写partition()方法即可

假设现在有一个需求

发送过来的数据中如果包含cuihaida,就发往0号分区,不包含cuihaida,就发往1号分区

package com.example.kafkademo.producer;

import org.apache.kafka.clients.producer.partitioner;
import org.apache.kafka.common.cluster;

import java.util.map;

/**
 * 1. 实现接口partitioner
 * 2. 实现3个方法:partition,close,configure
 * 3. 编写partition方法,返回分区号
 */
public class mypartitioner implements partitioner {

    /**
     * 重写这个方法
     * @param topic 主题
     * @param key 消息的key
     * @param keybytes 消息的key序列化后的字节数组
     * @param value 消息的值
     * @param valuebytes 消息的值序列化后的字节数组
     * @param cluster 集群元数据可以查看分区信息
     * @return 信息对应的分区
     */
    @override
    public int partition(string topic, object key, byte[] keybytes, object value, byte[] valuebytes, cluster cluster) {
        // 获取消息
        string msgvalue = value.tostring();
        // 发送过来的数据中如果包含cuihaida,就发往0号分区,不包含cuihaida,就发往1号分区
        return msgvalue.contains("cuihaida") ? 0 : 1;
    }

    @override
    public void close() {

    }

    @override
    public void configure(map<string, ?> map) {

    }
}

使用分区器的方法

在生产者的配置中添加分区器参数

package com.example.kafkademo.util;

import org.apache.kafka.clients.producer.producerconfig;

import java.util.properties;

public class commonutils {
    /**
     * kafka生产者配置配置
     * @return 配置内容
     */
    public static properties buildkafkaproperties() {
        // 1. 创建kafka生产者配置对象
        properties properties = new properties();
        // 2. 给kafka的配置对象添加信息
        properties.put(producerconfig.bootstrap_servers_config, "hadoop102:9092");
        // key, value初始化【必须有】
        properties.put(producerconfig.key_serializer_class_config, "org.apache.kafka.common.serialization.stringserializer");
        properties.put(producerconfig.value_serializer_class_config, "org.apache.kafka.common.serialization.stringserializer");
        
        // =========> 添加自定义分区器 <============
        properties.put(producerconfig.partitioner_class_config, "com.example.kafkademo.producer.mypartitioner")
        
        return properties;
    }
}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com