在我的一个应用程序中,我需要在我的 kafka 生产者上采用循环 key 分区策略。
写入不同分区仅适用于以下设置(1):
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyRandomPartioner.class);
而MyRandomPartitioner
类的实现如下:
public class MyRandomPartioner implements Partitioner {
private Logger logger = LoggerFactory.getLogger(MyRandomPartitioner.class);
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
logger.info(" Partition of Topic :" + numPartitions);
Random randomGenerator = new Random();
int randomInt = randomGenerator.nextInt(4) + 1;
logger.info(" selected Partition of Topic :" + randomInt);
return randomInt;
}
@Override
public void close() {
}
}
由于我想要平均分配,因此我禁用了上述 Prop (1),然后它总是写入单个分区。
我的生产者代码:
void sendData(String operation, String message){
final ProducerRecord<String, String> record = new ProducerRecord<String, String>(producerKafkaConfig.getTopicName(), operation,message);
producer.send(record, new ProducerCallback());
}
//Here operation is always fixed and message is my actual content.
最佳答案
由于您的记录是键和值,默认分区程序将检查键,如果键不存在,则只会进行正常分区,否则将根据键计算哈希值。
如果无法删除您记录的 key ,您可以使用以下分区器代码
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[]
valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
}
关于java - kafka启用默认的循环分区器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56925631/