使用 KafkaTemplate
将记录发送到主题,该主题配置为 bean,如下所示:
@Bean
public KafkaTemplate<Object, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
,一个会做:
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
...
SendResult<Object, Object> sendResult = kafkaTemplate.send(topic, object).get();
并且会通过将上面的内容包装在 try/catch block 中来捕获 InterruptedException
和 ExecutionException
,如下所示
try {
SendResult<Object, Object> sendResult = kafkaTemplate.send(topic, object).get();
if (sendResult.getRecordMetadata() != null && sendResult.getRecordMetadata().hasOffset()) {
//some code
} else {
//some code
}
} catch (InterruptedException | ExecutionException e) {
logger.error("An error has occurred: ", e);
}
最近,我了解到,发生中断异常时的最佳实践是将其重新抛出到 catch block 中,如下所示:
} catch (InterruptedException | ExecutionException e) {
logger.error("An error has occurred: ", e);
Thread.currentThread().interrupt();
}
(1) 在 KafkaTemplate
上下文中是否建议这样做?我倾向于认为不,因为我看到的所有示例都没有重新抛出中断。
(2) 如果是,好处是什么?
(3) 如果不重新抛出中断,有什么缺点吗?
最佳答案
这是基本的中断处理,与 Kafka 无关。
Thread.currentThread().interrupt();
是的,这是最佳实践。
您不是在那里“重新抛出”中断,而是设置中断位,这样,如果在线程上执行下游可中断操作,它也会被中断。
如果不设置中断位,会有很大的缺点。当线程被中断时,通常,应用程序希望线程退出它正在执行的操作。
考虑:
public void method2() {
...
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
// ignore
}
}
public void method1() throws InterruptedException {
...
method2();
System.out.println("method2 returned ok");
Thread.sleep(Long.MAX_VALUE);
}
线程将挂起并且永远不会退出,因为您“吃掉”了中断。
但是,您不应该不在多重捕获中执行此操作:
} catch (InterruptedException | ExecutionException e) {
logger.error("An error has occurred: ", e);
Thread.currentThread().interrupt();
}
这将设置两个异常的中断位,而不仅仅是 InterruptedException
。
关于spring-kafka - 在执行 KafkaTemplate.send 时,在 catch block 中捕获 InterruptedException 时是否应该重新抛出中断?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59661416/