java - 消费者之间的kafka消息分发

标签 java apache-kafka kafka-consumer-api

我有一个非常简单的 kafka 用例,我在 2 个分区之间面临消息分发问题。

我有 2 个主题分区,每个分区有 2 个消费者。我可以看到更多的消息进入了特定的分区,只有一个消费者正在获取要处理的消息,而另一个订阅了消息较少的分区的消费者永远处于闲置状态。两个消费者都具有相同的组 ID。我无法通过这个问题实现水平缩放。

下面是我放置的关键配置。

kafka.session.timeout.ms=10000
kafka.auto.commit=false
kafka.maxpoll.interval.ms=50000
kafka.request.timeout.ms=15000
kafka.maxpoll.records=100

**PS:**名称来 self 的 prop 文件,与真实的 kafka 属性名称不完全匹配。 我需要较大的最大轮询间隔以便一次性处理大块。任何猜测我需要在配置中添加或更改它吗?

最佳答案

正如在其他答案中提到的,kafka 使用键的散列来决定分区。可能是您的 key 分布不均。在这种情况下,您可以定义自己的策略以在生成记录时按生产者选择分区。 创建自定义 partitoner 类并实现其分区方法如下。

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;

public class CustomPartitioner implements Partitioner {

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if ((keyBytes == null) || (!(key instanceof String)))
            throw new InvalidRecordException("We expect all messages to have a key");
        // Your logic to decide partition based on key
        return 0;// Here return thr partition decided based on key
    }

    public void close() {
    }

    public void configure(Map<String, ?> configs) {
        // TODO Auto-generated method stub

    }
}

在生产者配置上添加以下内容

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getCanonicalName());
property 

关于java - 消费者之间的kafka消息分发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54951561/

相关文章:

java - Java HashMap使用通配符嵌套泛型

java - 如何在 Quarkus 中为 Kafka 使用 TLS?

java - Avro 解码给出 java.io.EOFException

apache-kafka - 无法描述Kafka Streams Consumer Group

apache-kafka - 限制Kafka消费者的记录数

java - Spring RestTemplate - 根据 http 状态代码读取不同的对象类型?

java - 点与平均 vector 之间的马哈拉诺比斯距离始终相同

java - 从变换矩阵中寻找角度

java - Apache Kafka + Kryo 序列化

apache-kafka - Kafka Nodes 和 zookeeper 将如何相互通信?