java - Spring Cloud Stream发送到Kafka错误控制处理

标签 java apache-kafka spring-cloud-stream

如何捕获写入 Kafka 主题的错误?

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

import javax.xml.bind.JAXBException;

@Component
@EnableBinding(Processor.class)
public class Parser {

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public String process(Message<?> input) {

        // do smth

        // how to catch an error if sending message to 'output' topic fails
    }

}

将生产者切换到同步模式。

spring.cloud.stream.kafka.bindings.output.producer.sync=true

接下来怎么办?有什么例子吗?扩展一些绑定(bind)实现,添加一些 AOP 魔法?

最佳答案

我认为您需要注册一个 KafkaProducerListener 来处理您案例中的错误场景,并使其在您的应用程序中作为 bean 可用。

有关 KafkaProducerListener 的更多信息是 here

另请注意,通常失败的消息将在 errorChannel 上以 channel 名称 error 提供。一旦您配置了错误 channel 的destination名称,就会发布所有错误消息:spring.cloud.stream.bindings.error.destination

关于java - Spring Cloud Stream发送到Kafka错误控制处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40440731/

相关文章:

java - 我使用从 maven 生成的 jar 通过 install4j 为我的应用程序创建了一个安装程序。为什么执行.exe找不到主类?

java - PHP 的 password_hash 和 password_verify 的 Java 等价物是什么?

cassandra - Cassandra 的 Kafka Sink 连接器失败

apache-kafka - 如何修复 kafka.common.errors.TimeoutException : Expiring 1 record(s) xxx ms has passed since batch creation plus linger time

java - 通过RabbitMQ维护correlationId

java - Kafka 生产者无法发送消息,出现 NOT_LEADER_FOR_PARTITION 异常

java - 准备好的语句有不同数量的 where 子句

java - 将鼠标移到 Chrome 网页元素上时计时器会加快

apache-kafka - 如何初始化 Apache-Zookeeper 的白名单?

apache-kafka-streams - Spring Cloud Stream 中是否可以有多个@StreamListener?