java - 处理批处理记录后如何提交kafka offset

标签 java performance spring-kafka

我正在使用 spring-kafka 并使用来自 kafka 主题的批处理记录,并通过 AbstractMessageListenerContainer.AckMode.BATCH 提交偏移量。

在我的例子中,处理批处理记录需要时间(大约 20 秒),并且消费者线程会等待批处理完成,然后再次进行轮询(在此轮询中提交偏移量)。在这种情况下,我将把记录的 List 分配给一个线程(名称:ProcessThread),该线程将处理所有记录并将结果返回给消费者线程,然后消费者线程将记录结果。 (在所有这个过程中,消费者线程将等待,直到从ProcessThread获取结果,这导致性能低下。

有什么方法可以让ProcessThread负责向kafka提交偏移量吗?这样消费者线程就不需要等待,并且对于每个轮询它将创建一个新的processThread

就我而言,我的主题有 20 个分区和 10 个 Pod,每个 Pod 有 2 个消费者线程(Spring Kafka 并发消费者),每个轮询 100 条记录(使用 Spring Boot 线程 @Async 处理所有这些记录)

通过上述配置,我可以在 2 小时内处理 100 万条记录,但我需要将其缩短到至少 40 分钟。

感谢任何帮助😊

最佳答案

您应该至少升级到 1.3.7。当前版本是2.1.10。其中没有单独的线程。只有消费者线程可以发送偏移量。消费者不是线程安全的。

您不应该切换到单独的线程。使用更高的并发和更多的分区。

关于java - 处理批处理记录后如何提交kafka offset,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52617111/

相关文章:

java - 在双向对象上使用 Hibernate 和 Jackson 时出现 StackOverflow 异常

java - 组合和排列算法(递归)

c# - 这个 LINQ 可以更高效吗?

spring-boot - DeadLetterPublishingRecoverer 中的 Kafka 序列化器与 Spring 消息转换器

java - 如何用 DefaultErrorHandler (spring-kafka) 替换已弃用的 SeekToCurrentErrorHandler?

java - 单元测试spring-kafka消费者时如何在KafkaEmbedded中设置端口

java - 结合 BigQuery 和 Pub/Sub Apache Beam

java - 比较两个对象的相等性有哪些替代方法?

css - 衡量单个 CSS 规则对页面渲染速度影响的工具?

mysql - bool 字段上的索引用于删除分区表中的记录