kafka自定义分区器
根据企业需求,自己重新实现分区器
只需要定义类实现partitioner接口,然后重写partition()方法即可
假设现在有一个需求
发送过来的数据中如果包含cuihaida,就发往0号分区,不包含cuihaida,就发往1号分区
package com.example.kafkademo.producer;
import org.apache.kafka.clients.producer.partitioner;
import org.apache.kafka.common.cluster;
import java.util.map;
/**
* 1. 实现接口partitioner
* 2. 实现3个方法:partition,close,configure
* 3. 编写partition方法,返回分区号
*/
public class mypartitioner implements partitioner {
/**
* 重写这个方法
* @param topic 主题
* @param key 消息的key
* @param keybytes 消息的key序列化后的字节数组
* @param value 消息的值
* @param valuebytes 消息的值序列化后的字节数组
* @param cluster 集群元数据可以查看分区信息
* @return 信息对应的分区
*/
@override
public int partition(string topic, object key, byte[] keybytes, object value, byte[] valuebytes, cluster cluster) {
// 获取消息
string msgvalue = value.tostring();
// 发送过来的数据中如果包含cuihaida,就发往0号分区,不包含cuihaida,就发往1号分区
return msgvalue.contains("cuihaida") ? 0 : 1;
}
@override
public void close() {
}
@override
public void configure(map<string, ?> map) {
}
}
使用分区器的方法
在生产者的配置中添加分区器参数
package com.example.kafkademo.util;
import org.apache.kafka.clients.producer.producerconfig;
import java.util.properties;
public class commonutils {
/**
* kafka生产者配置配置
* @return 配置内容
*/
public static properties buildkafkaproperties() {
// 1. 创建kafka生产者配置对象
properties properties = new properties();
// 2. 给kafka的配置对象添加信息
properties.put(producerconfig.bootstrap_servers_config, "hadoop102:9092");
// key, value初始化【必须有】
properties.put(producerconfig.key_serializer_class_config, "org.apache.kafka.common.serialization.stringserializer");
properties.put(producerconfig.value_serializer_class_config, "org.apache.kafka.common.serialization.stringserializer");
// =========> 添加自定义分区器 <============
properties.put(producerconfig.partitioner_class_config, "com.example.kafkademo.producer.mypartitioner")
return properties;
}
}
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论