我有一个休息后端点,它使用 Spring Cloud Stream Kafka Binder 消耗数据并写入 Kafka。目前我们还没有任何错误处理。但我们希望通过在数据未写入 Kafka 时添加额外检查来使此端点具有容错能力。我们打算在数据未写入 Kafka 时发送异常信息对象。我试图以这种方式使用全局错误来实现这一点
@ServiceActivator(inputChannel = "errorChannel")
public void handle(final ErrorMessage em) {
logger.error("encountered exception" + em.getOriginalMessage().toString());
throw new RuntimeException(em.getOriginalMessage().toString);
}
我的疑问有两个:
- 当我们写入 Kafka 的数据失败时,这是处理异常的正确方法吗?
- 每当数据写入失败时,是否会调用此句柄方法,并且此更改是否会传播到系统级错误处理。
如果还有其他流程,请提出建议。我们目前正在探索应用程序级错误处理和全局级错误处理。系统级错误处理目前暂不考虑。提前致谢。
最佳答案
您必须使用 errorChannelEnabled
选择异步错误处理 https://docs.spring.io/spring-cloud-stream/docs/3.1.3/reference/html/spring-cloud-stream.html#_producer_properties
通过将 kafka 生产者属性 sync
设置为 true
,您可以在发送线程上获得异常 https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.3/reference/html/spring-cloud-stream-binder-kafka.html#kafka-producer-properties
关于java - 使用 Spring Cloud Stream Kafka Binder 时 Kafka Producer 中的错误处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67723256/