我正在使用 spring-kafka
并使用来自 kafka 主题的批处理记录,并通过 AbstractMessageListenerContainer.AckMode.BATCH
提交偏移量。
在我的例子中,处理批处理记录需要时间(大约 20 秒),并且消费者线程会等待批处理完成,然后再次进行轮询(在此轮询中提交偏移量)。在这种情况下,我将把记录的 List
分配给一个线程(名称:ProcessThread
),该线程将处理所有记录并将结果返回给消费者线程,然后消费者线程将记录结果。 (在所有这个过程中,消费者线程将等待,直到从ProcessThread
获取结果,这导致性能低下。
有什么方法可以让ProcessThread
负责向kafka提交偏移量吗?这样消费者线程就不需要等待,并且对于每个轮询它将创建一个新的processThread
就我而言,我的主题有 20 个分区和 10 个 Pod,每个 Pod 有 2 个消费者线程(Spring Kafka 并发消费者),每个轮询 100 条记录(使用 Spring Boot 线程 @Async 处理所有这些记录
)
通过上述配置,我可以在 2 小时内处理 100 万条记录,但我需要将其缩短到至少 40 分钟。
感谢任何帮助😊
最佳答案
您应该至少升级到 1.3.7。当前版本是2.1.10。其中没有单独的线程。只有消费者线程可以发送偏移量。消费者不是线程安全的。
您不应该切换到单独的线程。使用更高的并发和更多的分区。
关于java - 处理批处理记录后如何提交kafka offset,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52617111/