java - 如果花费太多时间,如何取消 AsyncRestTemplate HTTP 请求?

标签 java spring guava resttemplate spring-4

从一开始,我一直对如何处理 InterruptedException 以及如果花费太多时间如何正确取消 http 请求感到困惑。我有一个库,其中我为我们的客户提供了两种方法,同步和异步。他们可以调用他们认为适合他们目的的任何方法。

  • executeSync() - 等待我得到结果,然后返回结果。
  • executeAsync() - 立即返回一个 Future,如果需要,可以在其他事情完成后进行处理。

他们将通过 DataKey包含用户 ID 和超时值的对象。我们将根据用户 ID 确定要调用哪台机器,然后使用该机器创建一个 URL,我们将使用 AsyncRestTemplate 对 URL 进行 http 调用然后根据是否成功将响应发送回给他们。

我正在使用 exchange AsyncRestTemplate的方法|返回一个 ListenableFuture我想使用基于 NIO 的客户端连接的异步非阻塞架构,以便请求使用非阻塞 IO,这就是我选择 AsyncRestTemplate 的原因.这种方法听起来适合我的问题定义吗?该库将在非常重的负载下用于生产。

下面是我的界面:

public interface Client {
    // for synchronous
    public DataResponse executeSync(DataKey key);

    // for asynchronous
    public ListenableFuture<DataResponse> executeAsync(DataKey key);
}

下面是我的接口(interface)实现:

public class DataClient implements Client {

    // using spring 4 AsyncRestTemplate
    private final AsyncRestTemplate restTemplate = new AsyncRestTemplate();

    // for synchronous
    @Override
    public DataResponse executeSync(DataKey keys) {
        Future<DataResponse> responseFuture = executeAsync(keys);
        DataResponse response = null;

        try {
            response = responseFuture.get(keys.getTimeout(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException ex) {
            // do we need to catch InterruptedException here and interrupt the thread?
            Thread.currentThread().interrupt();
            // also do I need throw this RuntimeException at all?
            throw new RuntimeException("Interrupted", ex);
        } catch (TimeoutException ex) {
            DataLogging.logEvents(ex, DataErrorEnum.CLIENT_TIMEOUT, keys);
            response = new DataResponse(null, DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
            responseFuture.cancel(true); // terminating the tasks that got timed out so that they don't take up the resources?
        } catch (Exception ex) {
            DataLogging.logEvents(ex, DataErrorEnum.ERROR_CLIENT, keys);
            response = new DataResponse(null, DataErrorEnum.ERROR_CLIENT, DataStatusEnum.ERROR);
        }

        return response;
    }

    // for asynchronous     
    @Override
    public ListenableFuture<DataResponse> executeAsync(final DataKey keys) {

        final SettableFuture<DataResponse> responseFuture = SettableFuture.create();
        final org.springframework.util.concurrent.ListenableFuture orig = 
            restTemplate.exchange(createURL(keys), HttpMethod.GET, keys.getEntity(), String.class);

        orig.addCallback(
                new ListenableFutureCallback<ResponseEntity<String>>() {
                    @Override
                    public void onSuccess(ResponseEntity<String> result) {
                        responseFuture.set(new DataResponse(result.getBody(), DataErrorEnum.OK,
                                DataStatusEnum.SUCCESS));
                    }

                    @Override
                    public void onFailure(Throwable ex) {
                        DataLogging.logErrors(ex, DataErrorEnum.ERROR_SERVER, keys);
                        responseFuture.set(new DataResponse(null, DataErrorEnum.ERROR_SERVER,
                                DataStatusEnum.ERROR));
                    }
                });

        // propagate cancellation back to the original request
        responseFuture.addListener(new Runnable() {
          @Override public void run() {
             if (responseFuture.isCancelled()) {
               orig.cancel(false); // I am keeping this false for now
             }
          }
        }, MoreExecutors.directExecutor());
        return responseFuture;
    }
}

客户会从他们的代码中这样调用 -

// if they are calling executeSync() method
DataResponse response = DataClientFactory.getInstance().executeSync(dataKey);

// and if they want to call executeAsync() method
Future<DataResponse> response = DataClientFactory.getInstance().executeAsync(dataKey);

现在的问题是——

  1. 我们可以打断AsyncRestTemplate如果 http 请求花费的时间太长,请调用?我实际上是在调用 cancel在我的future在我上面的代码中 executeSync方法,但我不确定如何验证它以确保它正在做它应该做的事情?我想将取消传播回原来的 future ,这样我就可以取消相应的 http 请求(我可能想要这样做以节省资源),这就是为什么我在我的 executeAsync 方法中添加了一个监听器的原因。我相信,我们不能打断RestTemplate打电话但不确定 AsyncRestTemplate我们是否可以这样做。如果假设我们可以中断 AsyncRestTemplate通话,那么我是否正确地中断了 http 通话?或者有没有更好/更清洁的方法来做到这一点?还是我什至需要担心用 AsyncRestTemplate 取消 Http 请求?我现在的设计?

        // propagate cancellation back to the original request
        responseFuture.addListener(new Runnable() {
          @Override public void run() {
             if (responseFuture.isCancelled()) {
               orig.cancel(false); // I am keeping this false for now
             }
          }
        }, MoreExecutors.directExecutor()); 
    

    使用当前设置,我可以看到它在某些时候(不是每次)都抛出 CancellationException - 这是否意味着我的 HTTP 请求被取消了?

  2. 我也在 InterruptedException 的 catch block 中做正确的事在 executeSync方法?如果没有,那么处理它的正确方法是什么。我需要处理InterruptedException就我而言?
  3. 是不是默认AsyncRestTamplete每个线程使用阻塞调用和请求?如果是,那么有没有办法在我当前的设置中建立基于 NIO 的客户端连接?

任何解释/代码建议都会有很大帮助。

最佳答案

首先,您为什么使用 SettableFuture?为什么不能只返回 AsyncRestTemplate 返回的 ListenableFuture?

1. Can we interrupt AsyncRestTemplate call if http request is taking too long?

你当然知道!您只需要调用 Future.cancel 方法。此方法会中断 AsyncRestTemplate 实际使用的内部 RestTemplate 的执行。

2. Also am I doing the right thing in catch block of InterruptedException in executeSync method?

正如 Phil 和 Danilo 所说,您不需要在 InterruptedException catch block 中中断当前线程。当必须取消请求的执行时,只需执行您需要执行的任何操作即可。

事实上,我建议你创建一个处理这种行为的方法,比如handleInterruption,并将这个方法用于TimeoutExceptionInterruptedException

3. Is it true that by default AsyncRestTamplete uses blocking calls and request per thread?

是的。 AsyncRestTamplete 的默认构造函数在内部使用 SimpleClientHttpRequestFactorySimpleAsyncTaskExecutor

这个TaskExecutor总是对每一个任务启动一个威胁,并且从不重用线程,所以效率非常低:

 * TaskExecutor implementation that fires up a new Thread for each task,
 * executing it asynchronously.
 *
 * Supports limiting concurrent threads through the "concurrencyLimit"
 * bean property. By default, the number of concurrent threads is unlimited.
 *
 * NOTE: This implementation does not reuse threads! Consider a
 * thread-pooling TaskExecutor implementation instead, in particular for
 * executing a large number of short-lived tasks.
 *

我建议你使用 AsyncRestTemplate 的另一种配置。

您应该使用使用另一个 TaskExecutor 的 AsyncRestTemplate 的构造函数:

public AsyncRestTemplate(AsyncListenableTaskExecutor taskExecutor)

例如:

AsyncRestTemplate template = new AsyncRestTemplate(new ConcurrentTaskExecutor(Executors.newCachedThreadPool()));

此 ExecutorService (Executors.newCachedThreadPool()) 会根据需要创建新线程,但会在可用时重用之前构建的线程。

或者更好的是,您可以使用另一个 RequestFactory。例如,您可以使用 HttpComponentsAsyncClientHttpRequestFactory,内部使用 NIO,只需调用 AsyncRestTemplate 的正确构造函数:

new AsyncRestTemplate(new HttpComponentsAsyncClientHttpRequestFactory())

不要忘记 AsyncRestTemplate 的内部行为将取决于您如何创建对象。

关于java - 如果花费太多时间,如何取消 AsyncRestTemplate HTTP 请求?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29380653/

相关文章:

java - JAVA 进程的输入总是挂起

Maven 3.0.4 NoSuchMethod : . .. java.lang.NoSuchMethodError : com. google.common.collect.ImmutableSet.copyOf(..)

Java + Lombok + Guava + 验证

java - 如何实现 View 幻灯片

java - 如何确定表达式在堆栈中是否具有平衡括号?

java - GWT:如何处理 mouseUp 事件?

java - @ResponseBody 将对象的所有值设置为 null。 Spring Controller

spring - 在 JPA 实体监听器中注入(inject) spring bean

支持列表的 Spring 绑定(bind)/速度

java - 可以通过值的类名称推断键名称的 HashMap