我正在尝试在 Spring MVC 中实现一个向 Kafka 发送消息的异步 REST 方法。一切正常,但是当服务器不可用时,会长时间处理 onFailure 事件。例如,如何将 ListenableFuture 中的响应时间限制为三秒。
这是我的代码:
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
@Value("${spring.kafka.topic}")
String topic;
@RequestMapping("/test")
DeferredResult<ResponseEntity<?>> test(
@RequestParam(value = "message") String message
) {
DeferredResult<ResponseEntity<?>> deferredResult = new DeferredResult<>();
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, "testKey", message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> sendResult) {
ResponseEntity<String> responseEntity = new ResponseEntity<>("SUCCESS", HttpStatus.OK);
deferredResult.setResult(responseEntity);
}
@Override
public void onFailure(Throwable ex) {
ResponseEntity<String> responseEntity = new ResponseEntity<>("FAILURE", HttpStatus.OK);
deferredResult.setResult(responseEntity);
}
});
return deferredResult;
}
我尝试使用 Kafka 的 REQUEST_TIMEOUT_MS_CONFIG
属性和 ListenableFuture 的 .get(long timeout, TimeUnit unit)
方法,但没有得到想要的结果。
最佳答案
那是因为生产者阻塞了 60 秒(默认情况下)。
参见 max.block.ms
in the KafkaDocumentation for producer configuration .
max.block.ms
The configuration controls how long KafkaProducer.send() and KafkaProducer.partitionsFor() will block.These methods can be blocked either because the buffer is full or metadata unavailable.Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.
关于java - 如何为 onFailure 事件设置超时(Spring,Kafka)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49487619/