java - 如果 Kafka 服务器不可用,则终止 Spring Kafka 事务

标签 java apache-kafka spring-kafka

在我们的项目中,我们需要通过多个线程定期从第三方获取数据,然后将这些数据推送到Kafka。如果 Kafka 服务器当前不可用,则应终止流程,所获取的数据应丢失并在下一次计划执行期间重新获取。另外,还需要使用事务管理,因为我们需要将批量消息发送到 Kafka 的不同主题。如果一条消息未发送,则所有其他消息都应回滚。

我们遇到了如果 Kafka 服务器不可用则终止执行的问题。当事务管理被禁用时,一切正常,我们得到

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

但是启用事务管理后,Kafka 生产者会尝试无限地访问服务器,并且所有发起将消息推送到 Kafka 的线程都会被粘住。我们尝试了不同的设置,让它在一段时间不可用后失败,但没有帮助。 设置spring.kafka.producer.retries: 0spring.kafka.producer.acks: 0原因(不是完整的堆栈跟踪):

Caused by: org.apache.kafka.common.config.ConfigException: Must set retries to non-zero when using the idempotent producer.

Caused by: org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.

当 Kafka 服务器不可用且启用了事务管理时,是否存在某些设置组合会导致 Kafka Producer 在超时后失败?这有可能吗?

最佳答案

Spring-Kafka 使用 DefaultAfterRollbackProcessor 来查找失败的偏移量并重试 - 这将继续循环,直到正确处理偏移量。这是默认行为。如果事务失败,您将回滚,这取决于 @Transactional 上的 rollbackFor 属性。

有一种特殊情况,如果 Kafka 服务器不可用,它将回滚。您可以通过实现 AfterRollbackProcessor 来创建自己的处理器。您需要区分定期回滚和由于连接超时而导致的回滚。

编辑: 您还可以在 @Transactional 上定义属性 noRollbackFor 以排除 TimeoutException 并让此异常到达 Container。您可以创建自定义 ExceptionHandler 并在容器上使用 setErrorHandler(..)。您可以阅读有关容器错误处理程序的信息 here

关于java - 如果 Kafka 服务器不可用,则终止 Spring Kafka 事务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51359577/

相关文章:

java - System.out.print 语句的意外输出

java - 适用于 Java 的 AndroidPublisher (V3) Google API 客户端库示例

spring-boot - spring kafka中使用SeekToCurrentErrorHandler时如何设置重试间隔时间

java - 这是 Spring-Kafka 文档中有关 BatchErrorHandler 的错误吗?

java - 通过子查询缩小查询结果

java - 从Java网站获取代理?

hadoop - 使用 NIFI 从 Kafka 插入到 Cassandra

apache-kafka - 如果 RocksDB 缓存在内存中,为什么要在 Kafka Streams Processor API 中启用记录缓存?

java - 如何在 Spring Boot 中反序列化 Kafka 主题中的 Json 字符串

java - 如何安全地取消订阅 Kafka 中的主题