java - Apache Kafka - 关于主题/分区的 KafkaStream

标签 java multithreading concurrency apache-kafka

我正在为大容量高速分布式应用程序编写 Kafka Consumer。我只有一个主题,但收到的消息率非常高。为更多消费者提供服务的多个分区将适合此用例。最好的消费方式是拥有多个流阅读器。根据文档或可用示例,ConsumerConnector 给出的 KafkaStreams 数量基于主题数量。想知道如何 [基于分区] 获得多个 KafkaStream 读取器,以便我可以跨每个流一个线程,或者在多个线程中从同一个 KafkaStream 中读取会从多个分区进行并发读取?

非常感谢任何见解。

最佳答案

想分享我从邮件列表中发现的内容:

您在主题图中传递的数字控制一个主题被分成多少个流。在您的情况下,如果您传入 1,则所有 10 个分区的数据都将被送入 1 个流。如果传入 2,则 2 个流中的每一个都会从 5 个分区中获取数据。如果传入 11 个,则其中 10 个将从 1 个分区中获取数据,而 1 个流则什么也得不到。

通常,您需要在自己的线程中迭代每个流。这是因为如果没有新事件,每个流都可能永远阻塞。

示例片段:

topicCount.put(msgTopic, new Integer(partitionCount));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = connector.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(msgTopic);

for (final KafkaStream stream : streams) {
    ReadTask task = new ReadTask(stream, msgTopic);
    task.addObserver(this.msgObserver);
    tasks.add(task); executor.submit(task);
}

引用:http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201201.mbox/%3CCA+sHyy_Z903dOmnjp7_yYR_aE2sRW-x7XpAnqkmWaP66GOqf6w@mail.gmail.com%3E

关于java - Apache Kafka - 关于主题/分区的 KafkaStream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23590808/

相关文章:

java - 一些 Perl 行不会从 Java Runtime.getRuntime().exec() 执行

java - 图像不会出现在 jar 中

java - java中如何设置软件的过期日期?

java - JNativeHook 来自同一操作的多个响应(鼠标单击、按键类型和鼠标滚动)

java - 线程池执行器具有优先任务并避免饥饿

Node.js:同时处理多个非常繁重的请求,对所有请求进行单一响应

java - 我的 ThreadLocal 始终包含并返回 null

JavaFX:HBox中具有相同宽度的按钮

c# - 多线程问题

database - 在并发访问数据库的上下文中,锁和闩锁有什么区别?