java - kafka启用默认的循环分区器

标签 java apache-kafka

在我的一个应用程序中,我需要在我的 kafka 生产者上采用循环 key 分区策略。

写入不同分区仅适用于以下设置(1):

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyRandomPartioner.class);

MyRandomPartitioner类的实现如下:

public class MyRandomPartioner implements Partitioner {
    private Logger logger = LoggerFactory.getLogger(MyRandomPartitioner.class);

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        logger.info(" Partition of Topic :" + numPartitions);
            Random randomGenerator = new Random();
            int randomInt = randomGenerator.nextInt(4) + 1;
            logger.info(" selected Partition of Topic :" + randomInt);
            return  randomInt;

    }

    @Override
    public void close() {
    }

} 

由于我想要平均分配,因此我禁用了上述 Prop (1),然后它总是写入单个分区。

我的生产者代码:

void sendData(String operation, String message){
final ProducerRecord<String, String> record = new ProducerRecord<String, String>(producerKafkaConfig.getTopicName(), operation,message);
            producer.send(record, new ProducerCallback()); 
        }
//Here operation is always fixed and message is my actual content. 

最佳答案

由于您的记录是键和值,默认分区程序将检查键,如果键不存在,则只会进行正常分区,否则将根据键计算哈希值。

如果无法删除您记录的 key ,您可以使用以下分区器代码

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] 

    valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();

            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }

        }

关于java - kafka启用默认的循环分区器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56925631/

相关文章:

java - 如何以编程方式编译和实例化 Java 类?

java - 在 Java 中使用 Timer 重置矩形位置

java - 在不同的 WAR 之间共享一个线程池

queue - 简单的拉取消息队列

java - 在 IntelliJ IDEA 中,如何为作用于当前实例的方法调用着色?

java - ElasticsearchSinkConnector 对象映射无法从嵌套更改为非嵌套

apache-kafka - 如果我在主题级别和生产者级别设置 'compression.type',则优先

apache-kafka - 单个分区上的多个主题?

apache-kafka - kafka + 如何计算 log.retention.byte 的值

java - 如何正确实现自动登录接口(interface),Liferay 6.2