我有一个客户正在使用的图书馆,他们正在通过 DataRequest
具有 userid
的对象, timeout
以及其中的一些其他字段。现在我用这个 DataRequest
对象创建 URL,然后我使用 RestTemplate
进行 HTTP 调用我的服务返回一个 JSON 响应,我用它来制作 DataResponse
对象并返回此 DataResponse
反对他们。
下面是我的DataClient
客户通过传递使用的类 DataRequest
反对它。我正在使用客户在 DataRequest
中传递的超时值如果在 getSyncData
中花费太多时间,则请求超时方法。
public class DataClient implements Client {
private RestTemplate restTemplate = new RestTemplate();
// first executor
private ExecutorService service = Executors.newFixedThreadPool(15);
@Override
public DataResponse getSyncData(DataRequest key) {
DataResponse response = null;
Future<DataResponse> responseFuture = null;
try {
responseFuture = getAsyncData(key);
response = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
} catch (TimeoutException ex) {
response = new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
responseFuture.cancel(true);
// logging exception here
}
return response;
}
@Override
public Future<DataResponse> getAsyncData(DataRequest key) {
DataFetcherTask task = new DataFetcherTask(key, restTemplate);
Future<DataResponse> future = service.submit(task);
return future;
}
}
DataFetcherTask
类:
public class DataFetcherTask implements Callable<DataResponse> {
private DataRequest key;
private RestTemplate restTemplate;
public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
public DataResponse call() throws Exception {
// In a nutshell below is what I am doing here.
// 1. Make an url using DataRequest key.
// 2. And then execute the url RestTemplate.
// 3. Make a DataResponse object and return it.
// I am calling this whole logic in call method as LogicA
}
}
截至目前我的DataFetcherTask
类负责一个DataRequest
键如上所示..
问题陈述:-
现在我有一个小的设计更改。客户将通过DataRequest
(例如 keyA)对象到我的库,然后我将使用 DataRequest
中存在的用户 ID 对另一个服务(我在当前设计中没有这样做)进行新的 http 调用。 (keyA) 对象,它将返回用户 ID 列表,因此我将使用这些用户 ID 并制作其他一些 DataRequest
(keyB, keyC, keyD) 对象对应响应中返回的每个用户 ID。然后我会有 List<DataRequest>
将具有 keyB、keyC 和 keyD 的对象 DataRequest
目的。 List<DataRequest>
中的最大元素将是三个,仅此而已。
现在对于每个 DataRequest
List<DataRequest>
中的对象我想执行上面的 DataFetcherTask.call
并行方法然后制作List<DataResponse>
通过添加每个 DataResponse
对于每个键。所以我将对 DataFetcherTask.call
进行三个并行调用.此并行调用背后的想法是在相同的全局超时值中获取所有这些最多三个键的数据。
所以我的建议是 - DataFetcherTask
类将返回 List<DataResponse>
对象而不是 DataResponse
然后是 getSyncData
的签名和 getAsyncData
方法也会改变。所以这里是算法:
- 使用客户传递的DataRequest 对象生成
List<DataRequest>
通过调用另一个 HTTP 服务。 - 为每个
DataRequest
进行并行调用在List<DataRequest>
至DataFetcherTask.call
方法并返回List<DataResponse>
反对客户而不是DataResponse
.
通过这种方式,我也可以在步骤 1 和步骤 2 上应用相同的全局超时。如果上述任何一步都需要时间,我们将在 getSyncData
中超时。方法。
DataFetcherTask
设计更改后的类:
public class DataFetcherTask implements Callable<List<DataResponse>> {
private DataRequest key;
private RestTemplate restTemplate;
// second executor here
private ExecutorService executorService = Executors.newFixedThreadPool(10);
public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
public List<DataResponse> call() throws Exception {
List<DataRequest> keys = generateKeys();
CompletionService<DataResponse> comp = new ExecutorCompletionService<>(executorService);
int count = 0;
for (final DataRequest key : keys) {
comp.submit(new Callable<DataResponse>() {
@Override
public DataResponse call() throws Exception {
return performDataRequest(key);
}
});
}
List<DataResponse> responseList = new ArrayList<DataResponse>();
while (count-- > 0) {
Future<DataResponse> future = comp.take();
responseList.add(future.get());
}
return responseList;
}
// In this method I am making a HTTP call to another service
// and then I will make List<DataRequest> accordingly.
private List<DataRequest> generateKeys() {
List<DataRequest> keys = new ArrayList<>();
// use key object which is passed in contructor to make HTTP call to another service
// and then make List of DataRequest object and return keys.
return keys;
}
private DataResponse performDataRequest(DataRequest key) {
// This will have all LogicA code here which is shown in my original design.
// everything as it is same..
}
}
现在我的问题是 -
- 一定要这样吗?解决这个问题的正确设计是什么?我的意思是有
call
另一个方法call
方法看起来很奇怪? - 我们是否需要像我的代码中那样有两个执行程序?有没有更好的方法来解决这个问题,或者我们可以在这里做任何类型的简化/设计更改?
我已经简化了代码,这样我的想法就很清楚了。
最佳答案
正如您的问题的评论中已经提到的,您可以使用 Java 的 ForkJoin 框架。这将为您节省 DataFetcherTask
中的额外线程池。
您只需在 DataClient
中使用 ForkJoinPool
并将您的 DataFetcherTask
转换为 RecursiveTask
(一个ForkJoinTask
的子类型)。这使您可以轻松地并行执行其他子任务。
因此,经过这些修改后,您的代码将如下所示:
DataFetcherTask
DataFetcherTask
现在是一个 RecursiveTask
,它首先生成键并为每个生成的键调用子任务。这些子任务在与父任务相同的 ForkJoinPool
中执行。
public class DataFetcherTask extends RecursiveTask<List<DataResponse>> {
private final DataRequest key;
private final RestTemplate restTemplate;
public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
protected List<DataResponse> compute() {
// Create subtasks for the key and invoke them
List<DataRequestTask> requestTasks = requestTasks(generateKeys());
invokeAll(requestTasks);
// All tasks are finished if invokeAll() returns.
List<DataResponse> responseList = new ArrayList<>(requestTasks.size());
for (DataRequestTask task : requestTasks) {
try {
responseList.add(task.get());
} catch (InterruptedException | ExecutionException e) {
// TODO - Handle exception properly
Thread.currentThread().interrupt();
return Collections.emptyList();
}
}
return responseList;
}
private List<DataRequestTask> requestTasks(List<DataRequest> keys) {
List<DataRequestTask> tasks = new ArrayList<>(keys.size());
for (DataRequest key : keys) {
tasks.add(new DataRequestTask(key));
}
return tasks;
}
// In this method I am making a HTTP call to another service
// and then I will make List<DataRequest> accordingly.
private List<DataRequest> generateKeys() {
List<DataRequest> keys = new ArrayList<>();
// use key object which is passed in contructor to make HTTP call to another service
// and then make List of DataRequest object and return keys.
return keys;
}
/** Inner class for the subtasks. */
private static class DataRequestTask extends RecursiveTask<DataResponse> {
private final DataRequest request;
public DataRequestTask(DataRequest request) {
this.request = request;
}
@Override
protected DataResponse compute() {
return performDataRequest(this.request);
}
private DataResponse performDataRequest(DataRequest key) {
// This will have all LogicA code here which is shown in my original design.
// everything as it is same..
return new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK);
}
}
}
数据客户端
DataClient
除了新的线程池外不会有太大变化:
public class DataClient implements Client {
private final RestTemplate restTemplate = new RestTemplate();
// Replace the ExecutorService with a ForkJoinPool
private final ForkJoinPool service = new ForkJoinPool(15);
@Override
public List<DataResponse> getSyncData(DataRequest key) {
List<DataResponse> responsList = null;
Future<List<DataResponse>> responseFuture = null;
try {
responseFuture = getAsyncData(key);
responsList = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
} catch (TimeoutException | ExecutionException | InterruptedException ex) {
responsList = Collections.singletonList(new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR));
responseFuture.cancel(true);
// logging exception here
}
return responsList;
}
@Override
public Future<List<DataResponse>> getAsyncData(DataRequest key) {
DataFetcherTask task = new DataFetcherTask(key, this.restTemplate);
return this.service.submit(task);
}
}
使用 Java8 后,您可以考虑将实现更改为 CompletableFuture
。然后它看起来像这样:
DataClientCF
public class DataClientCF {
private final RestTemplate restTemplate = new RestTemplate();
private final ExecutorService executor = Executors.newFixedThreadPool(15);
public List<DataResponse> getData(DataRequest initialKey) {
return CompletableFuture.supplyAsync(() -> generateKeys(initialKey), this.executor)
.thenApply(requests -> requests.stream().map(this::supplyRequestAsync).collect(Collectors.toList()))
.thenApply(responseFutures -> responseFutures.stream().map(future -> future.join()).collect(Collectors.toList()))
.exceptionally(t -> { throw new RuntimeException(t); })
.join();
}
private List<DataRequest> generateKeys(DataRequest key) {
return new ArrayList<>();
}
private CompletableFuture<DataResponse> supplyRequestAsync(DataRequest key) {
return CompletableFuture.supplyAsync(() -> new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK), this.executor);
}
}
如评论中所述,Guava 的 ListenableFuture
将为 Java7 提供类似的功能,但如果没有 Lambda,它们往往会变得笨拙。
关于java - 从调用方法并行执行方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34316353/