java - 如果一个分区受到限制,如何对 kafka 中的剩余分区应用循环法

标签 java apache-kafka kafka-producer-api

我限制了主题的一个分区用于特定服务(因此所有请求都将到达此处以获取服务 X)。对于任何其他服务请求将到达剩余的 N 个分区。

在java中,我通过org.apache.kafka.clients. Producer.Partitioner接口(interface)实现了它。

@Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        String partitionKey = (String) key;

        if(Channel.DB.getValue().equalsIgnoreCase(partitionKey) && ( KafkaTopic.TRANS.getValue().equalsIgnoreCase(topic) || KafkaTopic.CONS.getValue().equalsIgnoreCase(topic) )){
            return 1; // this is reserved for SERVICE X only
        }

        return 0; // here i want to produce messages on remaining partitions, how to return partition now?
    }

问题: 1:如何返回分区号在这种情况下 2:如何以循环方式生成其他消息,不包括服务 X 的分区。

我正在使用 Apache Kafka 9.0.1。

最佳答案

下面的代码对我有用 - 这里的想法是,当 key 不适用于保留分区时,您可以从可用分区列表中删除该特定分区,并对剩余分区进行循环。

private final AtomicInteger counter = new AtomicInteger(0);

public static final int SPECIAL_PARTITION_ID = 1;

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();

    String partitionKey = (String) key;

    if ("SPECIAL_CUSTOMER".equals(partitionKey)) {
        LOGGER.info("PARTITION= " + SPECIAL_PARTITION_ID);
        return SPECIAL_PARTITION_ID; //special partition reserved for MY_SPECIAL_CUSTOMER
    } else {
        int nextValue = counter.getAndIncrement();

        List<PartitionInfo> availablePartitions = new ArrayList<>(cluster.availablePartitionsForTopic(topic));

        if (availablePartitions.size() > 0) {

            PartitionInfo specialPartition = null;

            for (PartitionInfo partitionInfo : availablePartitions) {
                if (partitionInfo.partition() == SPECIAL_PARTITION_ID) {
                    specialPartition = partitionInfo;
                    break;
                }
            }

            availablePartitions.remove(specialPartition);

            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            //optional -- depending upon your usecase
            while (true) {
                int p = Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
                if(p != SPECIAL_PARTITION_ID) {
                    return p;
                }
            }
        }
    }
}

如果您可以只确保一个键始终进入保留分区,而其他键可能会进行循环(包括特殊分区),那么您可以通过在键用于保留分区时传递partitionId来轻松实现它,否则根本不传递键,这可以节省您编写自定义分区程序。

此外,如果您不介意保留分区是最后一个分区,其余分区则分配给其他分区,则可能有一个更简单的实现(摘自《Kafka:权威指南》一书)

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;

public class BananaPartitioner implements Partitioner {

    public void configure(Map<String, ?> configs) {} 1

    public int partition(String topic, Object key, byte[] keyBytes,
                           Object value, byte[] valueBytes,
                             Cluster cluster) {
        List<PartitionInfo> partitions =
          cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        if ((keyBytes == null) || (!(key instanceOf String))) 2
            throw new InvalidRecordException("We expect all messages
              to have customer name as key")

        if (((String) key).equals("Banana"))
            return numPartitions; // Banana will always go to last
                                     partition

        // Other records will get hashed to the rest of the
           partitions
        return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
    }

    public void close() {}
}

关于java - 如果一个分区受到限制,如何对 kafka 中的剩余分区应用循环法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48885460/

相关文章:

java - Hadoop 字数统计期间出现异常

apache-kafka - kafka分区重新平衡(分配)花费太多时间

go - 通过golang将消息以avro格式推送到kafka

apache-kafka - 如何从嵌套 JSON 对象创建 KSQLdb 流字段

java - Kafka Producer将消息发布到单个分区

apache-kafka - 卡夫卡消费者错误

java - 有没有办法在 Spring Data Neo4j 4 中拥有可嵌入类

java - 什么是热刷新?

java - 如何在数据库触发器创建 id 时禁用生成器

java - 如何编写 Kafka 消费者——单线程 vs 多线程