我有一个场景,其中有多个 Kafka 主题(每个主题有一个分区)和一个消费者组来消费记录。我在消费者组中使用单个模式匹配的消费者来匹配所有主题,从而消耗所有主题中的所有记录。
我现在想要扩大规模并让多个消费者(在同一消费者组中)收听所有主题。然而,这似乎不起作用,因为所有记录仅由组中的第一个消费者使用,使得组中的其他消费者毫无用处。另外,我使用 ExecutorService
将消费者作为单独的线程运行。
我怎样才能实现这个目标? 下面是我的代码:
模式pattern = Pattern.compile(topicPattern); Consumer.subscribe(pattern);
上面代码中发送的模式与所有主题的名称相匹配, 例如。
如果主题名称为 sample_topic_1
、sample_topic_2
等,我们将其与 sample_topic_*$
进行匹配。
最佳答案
您描述的方法应该适用于您发布的代码。然而,这可能是因为没有足够的数据供多个消费者使用的情况。或者,数据可能以“突发”的形式出现,这些数据足够小,可以容纳在单个批处理中。
尽管 Kafka 中的负载理论上分布在同一消费者组的所有消费者上,但实际上,如果只有一个“批处理”的数据,那么第一个消费者就可以获取所有数据,而其他人就没有任何数据了。这意味着:
- 您没有发送足够的数据来分发给所有消费者(尝试发送更多数据来验证这一点),或者
- 您有一个奇怪的配置,其中配置的批处理非常巨大,并且/或
linger.ms
属性配置得非常高,或者 - 以上两者的组合。
我建议首先尝试发送更多数据,看看是否可以解决问题。如果不是,请尝试缩减至仅 1 个消费者,验证其是否仍然有效。然后,只需向该消费者组再添加一个消费者,并查看行为是否发生变化。
关于java - 与 Kafka 消费者匹配的扩展模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59651369/