java - 与 Kafka 消费者匹配的扩展模式

标签 java apache-kafka kafka-consumer-api

我有一个场景,其中有多个 Kafka 主题(每个主题有一个分区)和一个消费者组来消费记录。我在消费者组中使用单个模式匹配的消费者来匹配所有主题,从而消耗所有主题中的所有记录。

我现在想要扩大规模并让多个消费者(在同一消费者组中)收听所有主题。然而,这似乎不起作用,因为所有记录仅由组中的第一个消费者使用,使得组中的其他消费者毫无用处。另外,我使用 ExecutorService 将消费者作为单独的线程运行。

我怎样才能实现这个目标? 下面是我的代码:

模式pattern = Pattern.compile(topicPattern); Consumer.subscribe(pattern);

上面代码中发送的模式与所有主题的名称相匹配, 例如。

如果主题名称为 sample_topic_1sample_topic_2 等,我们将其与 sample_topic_*$ 进行匹配。

最佳答案

您描述的方法应该适用于您发布的代码。然而,这可能是因为没有足够的数据供多个消费者使用的情况。或者,数据可能以“突发”的形式出现,这些数据足够小,可以容纳在单个批处理中。

尽管 Kafka 中的负载理论上分布在同一消费者组的所有消费者上,但实际上,如果只有一个“批处理”的数据,那么第一个消费者就可以获取所有数据,而其他人就没有任何数据了。这意味着:

  • 您没有发送足够的数据来分发给所有消费者(尝试发送更多数据来验证这一点),或者
  • 您有一个奇怪的配置,其中配置的批处理非常巨大,并且/或 linger.ms 属性配置得非常高,或者
  • 以上两者的组合。

我建议首先尝试发送更多数据,看看是否可以解决问题。如果不是,请尝试缩减至仅 1 个消费者,验证其是否仍然有效。然后,只需向该消费者组再添加一个消费者,并查看行为是否发生变化。

关于java - 与 Kafka 消费者匹配的扩展模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59651369/

相关文章:

java - 能够从不同的类访问文本区域

java - 在Eclipse中使用Jsoup时出错

Java:如何确定文件所在的驱动器类型?

jdbc - 如何配置kafka集群进行批处理

java - 如何知道java kafka应用程序客户端中是否达到了max.poll.interval.ms?

java - 如何使用 KStreams 将 Kafka 主题的数据写入文件?

java - JPA 一个事务内的多次提交

java - Spark Streaming 中的 Kafka 消费者

apache-spark - Spark Streaming中如何将压缩数据写入Kafka?

performance - 使用 TLS 的 Kafka 消费者。性能问题