java - 并行化 for 循环并填充多个数据结构

标签 java multithreading thread-safety executorservice

我有一个要并行化的 for 循环。在下面的代码中,我迭代最外层的 for 循环并将条目放入各种数据结构中,并且工作正常。所有这些数据结构在同一个类中都有一个 getter,稍后我会在其他类的 for 循环中完成所有操作后使用该 getter 来获取所有详细信息。我正在填充 infoitemToNumberMappingcatToValueHoldertasksByCategorycatHolderitemIds 数据结构,它们也有 getter。

  // want to parallelize this for loop
  for (Task task : tasks) {
    if (task.getCategories().isEmpty() || task.getEventList() == null
        || task.getMetaInfo() == null) {
      continue;
    }
    String itemId = task.getEventList().getId();
    String categoryId = task.getCategories().get(0).getId();
    Processor fp = new Processor(siteId, itemId, categoryId, poolType);
    Map<String, Integer> holder = fp.getDataHolder();
    if (!holder.isEmpty()) {
      for (Map.Entry<String, Integer> entry : holder.entrySet()) {
        info.putIfAbsent(entry.getKey(), entry.getValue());
      }
      List<Integer> values = new ArrayList<>();
      for (String key : holder.keySet()) {
        values.add(info.get(key));
      }
      itemToNumberMapping.put(itemId, StringUtils.join(values, ","));
      catToValueHolder.put(categoryId, StringUtils.join(values, ","));
    }
    Category cat = getCategory(task, holder.isEmpty());
    tasksByCategory.add(cat);
    LinkedList<String> ids = getCategoryIds(task);
    catHolder.put(categoryId, ids.getLast());
    itemIds.add(itemId);
  }

现在我知道如何并行化 for 循环,如下例所示,但令人困惑的是 - 就我而言,我没有像下面示例中的 output 这样的对象。就我而言,我有多个数据结构,通过迭代 for 循环来填充,所以我很困惑如何并行化最外层的 for 循环并仍然填充所有这些数据结构?

private final ExecutorService service = Executors.newFixedThreadPool(10);

List<Future<Output>> futures = new ArrayList<Future<Output>>();
for (final Input input : inputs) {
  Callable<Output> callable = new Callable<Output>() {
    public Output call() throws Exception {
      Output output = new Output();
      // process your input here and compute the output
      return output;
    }
  };
  futures.add(service.submit(callable));
}

service.shutdown();

List<Output> outputs = new ArrayList<Output>();
for (Future<Output> future : futures) {
  outputs.add(future.get());
}

更新:-

我正在并行化 do while 循环内的 for 循环,并且我的 do while 循环运行直到 number 小于或等于 pages。所以也许我做得不正确。因为我的 do while 循环将运行直到所有页面都完成,并且对于每个页面,我有一个 for 循环,我试图并行化它,并且我设置它的方式,它给出了 rejectedexecutionexception

  private void check() {
    String endpoint = "some_url";
    int number = 1;
    int pages = 0;
    do {
      ExecutorService executorService = Executors.newFixedThreadPool(10);
      for (int i = 1; i <= retryCount; i++) {
        try {
          HttpEntity<String> requestEntity =
              new HttpEntity<String>(getBody(number), getHeader());
          ResponseEntity<String> responseEntity =
              HttpClient.getInstance().getClient()
                  .exchange(URI.create(endpoint), HttpMethod.POST, requestEntity, String.class);
          String jsonInput = responseEntity.getBody();
          Process response = objectMapper.readValue(jsonInput, Process.class);
          pages = (int) response.getPaginationResponse().getTotalPages();
          List<Task> tasks = response.getTasks();
          if (pages <= 0 || tasks.isEmpty()) {
            continue;
          }
          // want to parallelize this for loop
          for (Task task : tasks) {
            Callable<Void> c = new Callable<>() {
              public void call() {
                if (!task.getCategories().isEmpty() && task.getEventList() != null
                    && task.getMetaInfo() != null) {
                    // my code here
                }
              }
            };
            executorService.submit(c);
          }
          // is this at right place? because I am getting rejectedexecutionexception
          executorService.shutdown();
          number++;
          break;
        } catch (Exception ex) {
          // log exception
        }
      }
    } while (number <= pages);
  }

最佳答案

您不必从并行代码中输出某些内容。您只需获取外循环的主体并为每个项目创建一个任务,如下所示:

for (Task task : tasks) {
   Callable<Void> c = new Callable<>() {
      public void call() {
         if (task.getCategories().isEmpty() || task.getEventList() == null || task.getMetaInfo() == null) {
               // ... rest of code here
          }
       }
    };
    executorService.submit(c);
 }

// wait for executor service, check for exceptions or whatever else you want to do here

关于java - 并行化 for 循环并填充多个数据结构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48917750/

相关文章:

java - 循环混淆许多Android应用程序

java - 使用并发类并行处理目录中的文件

c++ - std::lock_guard 示例,解释其工作原理

java - 使用 JSOUP 将文档加载到 WebView

java - 如何仅使用 java 将 mp4 文件转换为 webm?

java - 像在 Hibernate 中一样选择

c# - Entity Framework 是否支持多线程?

java.lang.OutOfMemoryError : Java heap space

scala - 我的 Scala Actor 的属性是否应该标记为@volatile?

java - 你如何使用事件调度线程?