java - 如何在 spring-kafka 中的每条 kafka 消息后提交?

标签 java apache-kafka kafka-consumer-api spring-kafka

在我们的应用程序中,我们使用 kafka 消费者来确定发送电子邮件。

前几天我们遇到了一个问题,kafka 分区在读取和处理其所有记录之前超时。结果,它循环回到分区的开始,无法完成它收到的记录集,并且循环开始后生成的新数据从未得到处理。

我的团队建议我们可以在读取每条消息后告诉 Kafka 提交,但是我不知道如何从 Spring-kakfa 做到这一点。

应用程序使用 spring-kafka 2.1.6,消费者代码有点像这样。

@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.groupId}")
public void consume(String message, @Header("kafka_offset") int offSet) {
    try{
        EmailData data = objectMapper.readValue(message, EmailData.class);
        if(isEligableForEmail(data)){
            emailHandler.sendEmail(data)
        }
    } catch (Exception e) {
        log.error("Error: "+e.getMessage(), e);
    }
}

注意:sendEmail 函数使用 CompletableFutures,因为它必须在发送电子邮件之前调用不同的 API。

配置:(消费者的 yaml 文件片段,以及生产者的一部分)

consumer:
      max.poll.interval.ms: 3600000
producer: 
      retries: 0
      batch-size: 100000
      acks: 0
      buffer-memory: 33554432
      request.timeout.ms: 60000
      linger.ms: 10
      max.block.ms: 5000

最佳答案

如果你想要手动确认,那么你可以在方法参数中提供确认

@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.groupId}")
public void consume(String message, Acknowledgment ack, @Header("kafka_offset") int offSet) {

When using manual AckMode, you can also provide the listener with the Acknowledgment. The following example also shows how to use a different container factory.

来自文档的示例 @KafkaListener Annotation

@KafkaListener(id = "cat", topics = "myTopic",
      containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();

}

关于java - 如何在 spring-kafka 中的每条 kafka 消息后提交?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58566465/

相关文章:

java - JTable,使单元格文本多行

ubuntu - kafka启动错误ubuntu类加载失败

java - 我们可以在断电的情况下丢失 kafka 消息吗?

java - 如何将电子邮件地址隐藏为 url 参数

java - 在 Java 中查找属于给定类型的所有子对象

java - NetBeans:消除文本字段之间的间隙

mongodb - flume 或 kafka 相当于 mongodb

azure - 从浏览器端连接到 Azure IoT/事件中心

java - 如何使用 SASL_SSL 连接 Apache Kafka 设置 Spring Cloud Kafka 项目?

apache-kafka - ConcurrentKafkaListenerContainerFactory 中 kafka.concurrency 的默认值是多少?