在我们的项目中,我们需要通过多个线程定期从第三方获取数据,然后将这些数据推送到Kafka。如果 Kafka 服务器当前不可用,则应终止流程,所获取的数据应丢失并在下一次计划执行期间重新获取。另外,还需要使用事务管理,因为我们需要将批量消息发送到 Kafka 的不同主题。如果一条消息未发送,则所有其他消息都应回滚。
我们遇到了如果 Kafka 服务器不可用则终止执行的问题。当事务管理被禁用时,一切正常,我们得到 p>
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
但是启用事务管理后,Kafka 生产者会尝试无限地访问服务器,并且所有发起将消息推送到 Kafka 的线程都会被粘住。我们尝试了不同的设置,让它在一段时间不可用后失败,但没有帮助。
设置spring.kafka.producer.retries: 0
或spring.kafka.producer.acks: 0
原因(不是完整的堆栈跟踪):
Caused by: org.apache.kafka.common.config.ConfigException: Must set retries to non-zero when using the idempotent producer.
Caused by: org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.
当 Kafka 服务器不可用且启用了事务管理时,是否存在某些设置组合会导致 Kafka Producer 在超时后失败?这有可能吗?
最佳答案
Spring-Kafka 使用 DefaultAfterRollbackProcessor
来查找失败的偏移量并重试 - 这将继续循环,直到正确处理偏移量。这是默认行为。如果事务失败,您将回滚,这取决于 @Transactional
上的 rollbackFor
属性。
有一种特殊情况,如果 Kafka 服务器不可用,它将回滚。您可以通过实现 AfterRollbackProcessor 来创建自己的处理器。您需要区分定期回滚和由于连接超时而导致的回滚。
编辑:
您还可以在 @Transactional
上定义属性 noRollbackFor
以排除 TimeoutException
并让此异常到达 Container
。您可以创建自定义 ExceptionHandler 并在容器上使用 setErrorHandler(..)
。您可以阅读有关容器错误处理程序的信息 here
关于java - 如果 Kafka 服务器不可用,则终止 Spring Kafka 事务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51359577/