java - 从调用方法并行执行方法

标签 java multithreading thread-safety executorservice callable

我有一个客户正在使用的图书馆,他们正在通过 DataRequest具有 userid 的对象, timeout以及其中的一些其他字段。现在我用这个 DataRequest对象创建 URL,然后我使用 RestTemplate 进行 HTTP 调用我的服务返回一个 JSON 响应,我用它来制作 DataResponse对象并返回此 DataResponse反对他们。

下面是我的DataClient客户通过传递使用的类 DataRequest反对它。我正在使用客户在 DataRequest 中传递的超时值如果在 getSyncData 中花费太多时间,则请求超时方法。

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    // first executor
    private ExecutorService service = Executors.newFixedThreadPool(15);

    @Override
    public DataResponse getSyncData(DataRequest key) {
        DataResponse response = null;
        Future<DataResponse> responseFuture = null;

        try {
            responseFuture = getAsyncData(key);
            response = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
        } catch (TimeoutException ex) {
            response = new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
            responseFuture.cancel(true);
            // logging exception here               
        }

        return response;
    }   

    @Override
    public Future<DataResponse> getAsyncData(DataRequest key) {
        DataFetcherTask task = new DataFetcherTask(key, restTemplate);
        Future<DataResponse> future = service.submit(task);

        return future;
    }
}

DataFetcherTask类:

public class DataFetcherTask implements Callable<DataResponse> {

    private DataRequest key;
    private RestTemplate restTemplate;

    public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public DataResponse call() throws Exception {
        // In a nutshell below is what I am doing here. 
        // 1. Make an url using DataRequest key.
        // 2. And then execute the url RestTemplate.
        // 3. Make a DataResponse object and return it.

        // I am calling this whole logic in call method as LogicA
    }
}

截至目前我的DataFetcherTask类负责一个DataRequest键如上所示..

问题陈述:-

现在我有一个小的设计更改。客户将通过DataRequest (例如 keyA)对象到我的库,然后我将使用 DataRequest 中存在的用户 ID 对另一个服务(我在当前设计中没有这样做)进行新的 http 调用。 (keyA) 对象,它将返回用户 ID 列表,因此我将使用这些用户 ID 并制作其他一些 DataRequest (keyB, keyC, keyD) 对象对应响应中返回的每个用户 ID。然后我会有 List<DataRequest>将具有 keyB、keyC 和 keyD 的对象 DataRequest目的。 List<DataRequest> 中的最大元素将是三个,仅此而已。

现在对于每个 DataRequest List<DataRequest> 中的对象我想执行上面的 DataFetcherTask.call并行方法然后制作List<DataResponse>通过添加每个 DataResponse对于每个键。所以我将对 DataFetcherTask.call 进行三个并行调用.此并行调用背后的想法是在相同的全局超时值中获取所有这些最多三个键的数据。

所以我的建议是 - DataFetcherTask类将返回 List<DataResponse>对象而不是 DataResponse然后是 getSyncData 的签名和 getAsyncData方法也会改变。所以这里是算法:

  • 使用客户传递的DataRequest 对象生成List<DataRequest>通过调用另一个 HTTP 服务。
  • 为每个 DataRequest 进行并行调用在 List<DataRequest>DataFetcherTask.call方法并返回 List<DataResponse>反对客户而不是DataResponse .

通过这种方式,我也可以在步骤 1 和步骤 2 上应用相同的全局超时。如果上述任何一步都需要时间,我们将在 getSyncData 中超时。方法。

DataFetcherTask设计更改后的类:

public class DataFetcherTask implements Callable<List<DataResponse>> {

    private DataRequest key;
    private RestTemplate restTemplate;
    // second executor here
    private ExecutorService executorService = Executors.newFixedThreadPool(10);

    public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public List<DataResponse> call() throws Exception {
        List<DataRequest> keys = generateKeys();
        CompletionService<DataResponse> comp = new ExecutorCompletionService<>(executorService);

        int count = 0;
        for (final DataRequest key : keys) {
            comp.submit(new Callable<DataResponse>() {
                @Override
                public DataResponse call() throws Exception {
                    return performDataRequest(key);
                }
            });
        }

        List<DataResponse> responseList = new ArrayList<DataResponse>();
        while (count-- > 0) {
            Future<DataResponse> future = comp.take();
            responseList.add(future.get());
        }
        return responseList;
    }

    // In this method I am making a HTTP call to another service
    // and then I will make List<DataRequest> accordingly.
    private List<DataRequest> generateKeys() {
        List<DataRequest> keys = new ArrayList<>();
        // use key object which is passed in contructor to make HTTP call to another service
        // and then make List of DataRequest object and return keys.
        return keys;
    }       

    private DataResponse performDataRequest(DataRequest key) {
        // This will have all LogicA code here which is shown in my original design.
        // everything as it is same..
    }
}

现在我的问题是 -

  • 一定要这样吗?解决这个问题的正确设计是什么?我的意思是有 call另一个方法 call方法看起来很奇怪?
  • 我们是否需要像我的代码中那样有两个执行程序?有没有更好的方法来解决这个问题,或者我们可以在这里做任何类型的简化/设计更改?

我已经简化了代码,这样我的想法就很清楚了。

最佳答案

正如您的问题的评论中已经提到的,您可以使用 Java 的 ForkJoin 框架。这将为您节省 DataFetcherTask 中的额外线程池。

您只需在 DataClient 中使用 ForkJoinPool 并将您的 DataFetcherTask 转换为 RecursiveTask(一个ForkJoinTask 的子类型)。这使您可以轻松地并行执行其他子任务。

因此,经过这些修改后,您的代码将如下所示:

DataFetcherTask

DataFetcherTask 现在是一个 RecursiveTask,它首先生成键并为每个生成的键调用子任务。这些子任务在与父任务相同的 ForkJoinPool 中执行。

public class DataFetcherTask extends RecursiveTask<List<DataResponse>> {

  private final DataRequest key;
  private final RestTemplate restTemplate;

  public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
      this.key = key;
      this.restTemplate = restTemplate;
  }

  @Override
  protected List<DataResponse> compute() {
    // Create subtasks for the key and invoke them
    List<DataRequestTask> requestTasks = requestTasks(generateKeys());
    invokeAll(requestTasks);

    // All tasks are finished if invokeAll() returns.
    List<DataResponse> responseList = new ArrayList<>(requestTasks.size());
    for (DataRequestTask task : requestTasks) {
      try {
        responseList.add(task.get());
      } catch (InterruptedException | ExecutionException e) {
        // TODO - Handle exception properly
        Thread.currentThread().interrupt();
        return Collections.emptyList();
      }
    }

    return responseList;
  }

  private List<DataRequestTask> requestTasks(List<DataRequest> keys) {
    List<DataRequestTask> tasks = new ArrayList<>(keys.size());
    for (DataRequest key : keys) {
      tasks.add(new DataRequestTask(key));
    }

    return tasks;
  }

  // In this method I am making a HTTP call to another service
  // and then I will make List<DataRequest> accordingly.
  private List<DataRequest> generateKeys() {
      List<DataRequest> keys = new ArrayList<>();
      // use key object which is passed in contructor to make HTTP call to another service
      // and then make List of DataRequest object and return keys.
      return keys;
  }

  /** Inner class for the subtasks. */
  private static class DataRequestTask extends RecursiveTask<DataResponse> {

    private final DataRequest request;

    public DataRequestTask(DataRequest request) {
      this.request = request;
    }

    @Override
    protected DataResponse compute() {
      return performDataRequest(this.request);
    }

    private DataResponse performDataRequest(DataRequest key) {
      // This will have all LogicA code here which is shown in my original design.
      // everything as it is same..
      return new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK);
    }
  }

}

数据客户端

DataClient 除了新的线程池外不会有太大变化:

public class DataClient implements Client {

  private final RestTemplate restTemplate = new RestTemplate();
  // Replace the ExecutorService with a ForkJoinPool
  private final ForkJoinPool service = new ForkJoinPool(15);

  @Override
  public List<DataResponse> getSyncData(DataRequest key) {
      List<DataResponse> responsList = null;
      Future<List<DataResponse>> responseFuture = null;

      try {
          responseFuture = getAsyncData(key);
          responsList = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
      } catch (TimeoutException | ExecutionException | InterruptedException ex) {
          responsList = Collections.singletonList(new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR));
          responseFuture.cancel(true);
          // logging exception here
      }

      return responsList;
  }

  @Override
  public Future<List<DataResponse>> getAsyncData(DataRequest key) {
      DataFetcherTask task = new DataFetcherTask(key, this.restTemplate);
      return this.service.submit(task);
  }
}

使用 Java8 后,您可以考虑将实现更改为 CompletableFuture。然后它看起来像这样:

DataClientCF

public class DataClientCF {

  private final RestTemplate restTemplate = new RestTemplate();
  private final ExecutorService executor = Executors.newFixedThreadPool(15);

  public List<DataResponse> getData(DataRequest initialKey) {
    return CompletableFuture.supplyAsync(() -> generateKeys(initialKey), this.executor)
      .thenApply(requests -> requests.stream().map(this::supplyRequestAsync).collect(Collectors.toList()))
      .thenApply(responseFutures -> responseFutures.stream().map(future -> future.join()).collect(Collectors.toList()))
      .exceptionally(t -> { throw new RuntimeException(t); })
      .join();
  }

  private List<DataRequest> generateKeys(DataRequest key) {
    return new ArrayList<>();
  }

  private CompletableFuture<DataResponse> supplyRequestAsync(DataRequest key) {
    return CompletableFuture.supplyAsync(() -> new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK), this.executor);
  }
}

如评论中所述,Guava 的 ListenableFuture 将为 Java7 提供类似的功能,但如果没有 Lambda,它们往往会变得笨拙。

关于java - 从调用方法并行执行方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34316353/

相关文章:

python - 通过线程并行化缓慢的 api 调用

linux - Linux下如何高效运行短小的异步任务?

ios - 如何调试 wait_fences 错误

java - 刷新缓存分配线程安全

java - 当时间比较评估结果为真时,如何在 Java 中实时更新 Excel .csv 文件?

java - 如何使用 AsynchronousServerSocketChannel 绑定(bind)多个端口?

java - 为什么没有输入参数的PreparedStatement会成功,但带有输入参数的相同版本会失败?

java - get 和 set 方法的同步

Android线程和数据库锁定

java - 在 Eclipse 中重命名包