java - 如何为 onFailure 事件设置超时(Spring,Kafka)?

标签 java spring spring-mvc apache-kafka spring-kafka

我正在尝试在 Spring MVC 中实现一个向 Kafka 发送消息的异步 REST 方法。一切正常,但是当服务器不可用时,会长时间处理 onFailure 事件。例如,如何将 ListenableFuture 中的响应时间限制为三秒。

这是我的代码:

@Autowired
KafkaTemplate<String, String> kafkaTemplate;

@Value("${spring.kafka.topic}")
String topic;

@RequestMapping("/test")
DeferredResult<ResponseEntity<?>> test(
        @RequestParam(value = "message") String message
) {

    DeferredResult<ResponseEntity<?>> deferredResult = new DeferredResult<>();
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, "testKey", message);

    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

        @Override
        public void onSuccess(SendResult<String, String> sendResult) {
            ResponseEntity<String> responseEntity = new ResponseEntity<>("SUCCESS", HttpStatus.OK);
            deferredResult.setResult(responseEntity);
        }

        @Override
        public void onFailure(Throwable ex) {
            ResponseEntity<String> responseEntity = new ResponseEntity<>("FAILURE", HttpStatus.OK);
            deferredResult.setResult(responseEntity);
        }

    });

    return deferredResult;
}

我尝试使用 Kafka 的 REQUEST_TIMEOUT_MS_CONFIG 属性和 ListenableFuture 的 .get(long timeout, TimeUnit unit) 方法,但没有得到想要的结果。

最佳答案

那是因为生产者阻塞了 60 秒(默认情况下)。

参见 max.block.ms in the KafkaDocumentation for producer configuration .

max.block.ms The configuration controls how long KafkaProducer.send() and KafkaProducer.partitionsFor() will block.These methods can be blocked either because the buffer is full or metadata unavailable.Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.

关于java - 如何为 onFailure 事件设置超时(Spring,Kafka)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49487619/

相关文章:

java - 新套装 : How to implement the sieve of Eratosthenes in java

java - 动态流布局

java - 什么是原始类型,为什么我们不应该使用它呢?

java - 如何使用 Spring RestTemplate 发送数组?

spring - 如何在 Spring 应用程序的 servlet 过滤器中 Autowiring bean?

java - 如何将两个对象传递给同一个Spring Controller 表单提交?

java - Servlet + Java 的 HttpSession 不起作用

java - 为什么会发生 JsonParseException ?

java - Web应用程序管理上传的文档

java - 在 Spring MVC 验证中,是否可以一次只显示每个字段的一条错误消息?