spring-kafka - 在执行 KafkaTemplate.send 时,在 catch block 中捕获 InterruptedException 时是否应该重新抛出中断?

标签 spring-kafka

使用 KafkaTemplate 将记录发送到主题,该主题配置为 bean,如下所示:

@Bean
public KafkaTemplate<Object, Object> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

,一个会做:

@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
...
SendResult<Object, Object> sendResult = kafkaTemplate.send(topic, object).get();

并且会通过将上面的内容包装在 try/catch block 中来捕获 InterruptedExceptionExecutionException ,如下所示

try {
  SendResult<Object, Object> sendResult = kafkaTemplate.send(topic, object).get();
  if (sendResult.getRecordMetadata() != null && sendResult.getRecordMetadata().hasOffset()) {
     //some code
  } else {
     //some code
  }
} catch (InterruptedException | ExecutionException e) {     
  logger.error("An error has occurred: ", e);
}

最近,我了解到,发生中断异常时的最佳实践是将其重新抛出到 catch block 中,如下所示:

} catch (InterruptedException | ExecutionException e) {     
  logger.error("An error has occurred: ", e);
  Thread.currentThread().interrupt();
}

(1) 在 KafkaTemplate 上下文中是否建议这样做?我倾向于认为不,因为我看到的所有示例都没有重新抛出中断。
(2) 如果是,好处是什么?
(3) 如果不重新抛出中断,有什么缺点吗?

最佳答案

这是基本的中断处理,与 Kafka 无关。

Thread.currentThread().interrupt();

是的,这是最佳实践。

您不是在那里“重新抛出”中断,而是设置中断位,这样,如果在线程上执行下游可中断操作,它也会被中断。

如果不设置中断位,会有很大的缺点。当线程被中断时,通常,应用程序希望线程退出它正在执行的操作。

考虑:

public void method2() {

    ...
    try {
        Thread.sleep(1000);
    }
    catch (InterruptedException e) {
        // ignore
    }

}

public void method1() throws InterruptedException {

    ...
    method2();
    System.out.println("method2 returned ok");
    Thread.sleep(Long.MAX_VALUE);

}

线程将挂起并且永远不会退出,因为您“吃掉”了中断。

但是,您不应该在多重捕获中执行此操作:

} catch (InterruptedException | ExecutionException e) {     
  logger.error("An error has occurred: ", e);
  Thread.currentThread().interrupt();
}

这将设置两个异常的中断位,而不仅仅是 InterruptedException

关于spring-kafka - 在执行 KafkaTemplate.send 时,在 catch block 中捕获 InterruptedException 时是否应该重新抛出中断?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59661416/

相关文章:

apache-kafka - KStream 将记录发送到多个流(不是分支)

java - 每个主题是否可以有一个 Kafka 消费者线程?

spring-kafka - 使用spring Kafka时如何为生产者添加错误处理程序

apache-kafka - Spring Kafka 无法正常关闭

java - AWS Kafka (MSK) - 如何生成 keystore 和信任库并在我的 Spring Cloud Stream 应用程序中使用它们?

apache-kafka - Spring kafka消费者@retryabletopic无限重试

java - 使用 Spring Kafka 处理死信主题的消息

java - Spring 卡夫卡 : Read from two different topics in order

java - Kafka Consumer 无法为 JsonDeserializer 消费 905 字节的消息

java - Spring Cloud 流卡夫卡 : Duplicate @StreamListener mapping for 'input'