如何捕获写入 Kafka 主题的错误?
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
import javax.xml.bind.JAXBException;
@Component
@EnableBinding(Processor.class)
public class Parser {
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public String process(Message<?> input) {
// do smth
// how to catch an error if sending message to 'output' topic fails
}
}
将生产者切换到同步模式。
spring.cloud.stream.kafka.bindings.output.producer.sync=true
接下来怎么办?有什么例子吗?扩展一些绑定(bind)实现,添加一些 AOP 魔法?
最佳答案
我认为您需要注册一个 KafkaProducerListener 来处理您案例中的错误场景,并使其在您的应用程序中作为 bean 可用。
有关 KafkaProducerListener 的更多信息是 here
另请注意,通常失败的消息将在 errorChannel
上以 channel 名称 error
提供。一旦您配置了错误 channel 的destination
名称,就会发布所有错误消息:spring.cloud.stream.bindings.error.destination
。
关于java - Spring Cloud Stream发送到Kafka错误控制处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40440731/