从一开始,我一直对如何处理 InterruptedException 以及如果花费太多时间如何正确取消 http 请求感到困惑。我有一个库,其中我为我们的客户提供了两种方法,同步和异步。他们可以调用他们认为适合他们目的的任何方法。
- executeSync() - 等待我得到结果,然后返回结果。
- executeAsync() - 立即返回一个 Future,如果需要,可以在其他事情完成后进行处理。
他们将通过 DataKey
包含用户 ID 和超时值的对象。我们将根据用户 ID 确定要调用哪台机器,然后使用该机器创建一个 URL,我们将使用 AsyncRestTemplate 对 URL 进行 http 调用然后根据是否成功将响应发送回给他们。
我正在使用 exchange AsyncRestTemplate
的方法|返回一个 ListenableFuture
我想使用基于 NIO 的客户端连接的异步非阻塞架构,以便请求使用非阻塞 IO,这就是我选择 AsyncRestTemplate
的原因.这种方法听起来适合我的问题定义吗?该库将在非常重的负载下用于生产。
下面是我的界面:
public interface Client {
// for synchronous
public DataResponse executeSync(DataKey key);
// for asynchronous
public ListenableFuture<DataResponse> executeAsync(DataKey key);
}
下面是我的接口(interface)实现:
public class DataClient implements Client {
// using spring 4 AsyncRestTemplate
private final AsyncRestTemplate restTemplate = new AsyncRestTemplate();
// for synchronous
@Override
public DataResponse executeSync(DataKey keys) {
Future<DataResponse> responseFuture = executeAsync(keys);
DataResponse response = null;
try {
response = responseFuture.get(keys.getTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
// do we need to catch InterruptedException here and interrupt the thread?
Thread.currentThread().interrupt();
// also do I need throw this RuntimeException at all?
throw new RuntimeException("Interrupted", ex);
} catch (TimeoutException ex) {
DataLogging.logEvents(ex, DataErrorEnum.CLIENT_TIMEOUT, keys);
response = new DataResponse(null, DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
responseFuture.cancel(true); // terminating the tasks that got timed out so that they don't take up the resources?
} catch (Exception ex) {
DataLogging.logEvents(ex, DataErrorEnum.ERROR_CLIENT, keys);
response = new DataResponse(null, DataErrorEnum.ERROR_CLIENT, DataStatusEnum.ERROR);
}
return response;
}
// for asynchronous
@Override
public ListenableFuture<DataResponse> executeAsync(final DataKey keys) {
final SettableFuture<DataResponse> responseFuture = SettableFuture.create();
final org.springframework.util.concurrent.ListenableFuture orig =
restTemplate.exchange(createURL(keys), HttpMethod.GET, keys.getEntity(), String.class);
orig.addCallback(
new ListenableFutureCallback<ResponseEntity<String>>() {
@Override
public void onSuccess(ResponseEntity<String> result) {
responseFuture.set(new DataResponse(result.getBody(), DataErrorEnum.OK,
DataStatusEnum.SUCCESS));
}
@Override
public void onFailure(Throwable ex) {
DataLogging.logErrors(ex, DataErrorEnum.ERROR_SERVER, keys);
responseFuture.set(new DataResponse(null, DataErrorEnum.ERROR_SERVER,
DataStatusEnum.ERROR));
}
});
// propagate cancellation back to the original request
responseFuture.addListener(new Runnable() {
@Override public void run() {
if (responseFuture.isCancelled()) {
orig.cancel(false); // I am keeping this false for now
}
}
}, MoreExecutors.directExecutor());
return responseFuture;
}
}
客户会从他们的代码中这样调用 -
// if they are calling executeSync() method
DataResponse response = DataClientFactory.getInstance().executeSync(dataKey);
// and if they want to call executeAsync() method
Future<DataResponse> response = DataClientFactory.getInstance().executeAsync(dataKey);
现在的问题是——
我们可以打断
AsyncRestTemplate
如果 http 请求花费的时间太长,请调用?我实际上是在调用cancel
在我的future
在我上面的代码中executeSync
方法,但我不确定如何验证它以确保它正在做它应该做的事情?我想将取消传播回原来的 future ,这样我就可以取消相应的 http 请求(我可能想要这样做以节省资源),这就是为什么我在我的 executeAsync 方法中添加了一个监听器的原因。我相信,我们不能打断RestTemplate
打电话但不确定AsyncRestTemplate
我们是否可以这样做。如果假设我们可以中断AsyncRestTemplate
通话,那么我是否正确地中断了 http 通话?或者有没有更好/更清洁的方法来做到这一点?还是我什至需要担心用AsyncRestTemplate
取消 Http 请求?我现在的设计?// propagate cancellation back to the original request responseFuture.addListener(new Runnable() { @Override public void run() { if (responseFuture.isCancelled()) { orig.cancel(false); // I am keeping this false for now } } }, MoreExecutors.directExecutor());
使用当前设置,我可以看到它在某些时候(不是每次)都抛出 CancellationException - 这是否意味着我的 HTTP 请求被取消了?
- 我也在
InterruptedException
的 catch block 中做正确的事在executeSync
方法?如果没有,那么处理它的正确方法是什么。我需要处理InterruptedException
就我而言? - 是不是默认
AsyncRestTamplete
每个线程使用阻塞调用和请求?如果是,那么有没有办法在我当前的设置中建立基于 NIO 的客户端连接?
任何解释/代码建议都会有很大帮助。
最佳答案
首先,您为什么使用 SettableFuture?为什么不能只返回 AsyncRestTemplate 返回的 ListenableFuture?
1. Can we interrupt AsyncRestTemplate call if http request is taking too long?
你当然知道!您只需要调用 Future.cancel
方法。此方法会中断 AsyncRestTemplate 实际使用的内部 RestTemplate 的执行。
2. Also am I doing the right thing in catch block of InterruptedException in executeSync method?
正如 Phil 和 Danilo 所说,您不需要在 InterruptedException catch block 中中断当前线程。当必须取消请求的执行时,只需执行您需要执行的任何操作即可。
事实上,我建议你创建一个处理这种行为的方法,比如handleInterruption,并将这个方法用于TimeoutException
和InterruptedException
。
3. Is it true that by default AsyncRestTamplete uses blocking calls and request per thread?
是的。 AsyncRestTamplete
的默认构造函数在内部使用 SimpleClientHttpRequestFactory
和 SimpleAsyncTaskExecutor
。
这个TaskExecutor总是对每一个任务启动一个威胁,并且从不重用线程,所以效率非常低:
* TaskExecutor implementation that fires up a new Thread for each task,
* executing it asynchronously.
*
* Supports limiting concurrent threads through the "concurrencyLimit"
* bean property. By default, the number of concurrent threads is unlimited.
*
* NOTE: This implementation does not reuse threads! Consider a
* thread-pooling TaskExecutor implementation instead, in particular for
* executing a large number of short-lived tasks.
*
我建议你使用 AsyncRestTemplate 的另一种配置。
您应该使用使用另一个 TaskExecutor 的 AsyncRestTemplate 的构造函数:
public AsyncRestTemplate(AsyncListenableTaskExecutor taskExecutor)
例如:
AsyncRestTemplate template = new AsyncRestTemplate(new ConcurrentTaskExecutor(Executors.newCachedThreadPool()));
此 ExecutorService (Executors.newCachedThreadPool()) 会根据需要创建新线程,但会在可用时重用之前构建的线程。
或者更好的是,您可以使用另一个 RequestFactory。例如,您可以使用 HttpComponentsAsyncClientHttpRequestFactory
,内部使用 NIO,只需调用 AsyncRestTemplate 的正确构造函数:
new AsyncRestTemplate(new HttpComponentsAsyncClientHttpRequestFactory())
不要忘记 AsyncRestTemplate 的内部行为将取决于您如何创建对象。
关于java - 如果花费太多时间,如何取消 AsyncRestTemplate HTTP 请求?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29380653/