我有一个要并行化的 for 循环。在下面的代码中,我迭代最外层的 for 循环并将条目放入各种数据结构中,并且工作正常。所有这些数据结构在同一个类中都有一个 getter,稍后我会在其他类的 for 循环中完成所有操作后使用该 getter 来获取所有详细信息。我正在填充 info
、itemToNumberMapping
、catToValueHolder
、tasksByCategory
、catHolder
、itemIds
数据结构,它们也有 getter。
// want to parallelize this for loop
for (Task task : tasks) {
if (task.getCategories().isEmpty() || task.getEventList() == null
|| task.getMetaInfo() == null) {
continue;
}
String itemId = task.getEventList().getId();
String categoryId = task.getCategories().get(0).getId();
Processor fp = new Processor(siteId, itemId, categoryId, poolType);
Map<String, Integer> holder = fp.getDataHolder();
if (!holder.isEmpty()) {
for (Map.Entry<String, Integer> entry : holder.entrySet()) {
info.putIfAbsent(entry.getKey(), entry.getValue());
}
List<Integer> values = new ArrayList<>();
for (String key : holder.keySet()) {
values.add(info.get(key));
}
itemToNumberMapping.put(itemId, StringUtils.join(values, ","));
catToValueHolder.put(categoryId, StringUtils.join(values, ","));
}
Category cat = getCategory(task, holder.isEmpty());
tasksByCategory.add(cat);
LinkedList<String> ids = getCategoryIds(task);
catHolder.put(categoryId, ids.getLast());
itemIds.add(itemId);
}
现在我知道如何并行化 for 循环,如下例所示,但令人困惑的是 - 就我而言,我没有像下面示例中的 output
这样的对象。就我而言,我有多个数据结构,通过迭代 for 循环来填充,所以我很困惑如何并行化最外层的 for 循环并仍然填充所有这些数据结构?
private final ExecutorService service = Executors.newFixedThreadPool(10);
List<Future<Output>> futures = new ArrayList<Future<Output>>();
for (final Input input : inputs) {
Callable<Output> callable = new Callable<Output>() {
public Output call() throws Exception {
Output output = new Output();
// process your input here and compute the output
return output;
}
};
futures.add(service.submit(callable));
}
service.shutdown();
List<Output> outputs = new ArrayList<Output>();
for (Future<Output> future : futures) {
outputs.add(future.get());
}
更新:-
我正在并行化 do while 循环内的 for 循环,并且我的 do while 循环运行直到 number
小于或等于 pages
。所以也许我做得不正确。因为我的 do while 循环将运行直到所有页面都完成,并且对于每个页面,我有一个 for 循环,我试图并行化它,并且我设置它的方式,它给出了 rejectedexecutionexception
。
private void check() {
String endpoint = "some_url";
int number = 1;
int pages = 0;
do {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 1; i <= retryCount; i++) {
try {
HttpEntity<String> requestEntity =
new HttpEntity<String>(getBody(number), getHeader());
ResponseEntity<String> responseEntity =
HttpClient.getInstance().getClient()
.exchange(URI.create(endpoint), HttpMethod.POST, requestEntity, String.class);
String jsonInput = responseEntity.getBody();
Process response = objectMapper.readValue(jsonInput, Process.class);
pages = (int) response.getPaginationResponse().getTotalPages();
List<Task> tasks = response.getTasks();
if (pages <= 0 || tasks.isEmpty()) {
continue;
}
// want to parallelize this for loop
for (Task task : tasks) {
Callable<Void> c = new Callable<>() {
public void call() {
if (!task.getCategories().isEmpty() && task.getEventList() != null
&& task.getMetaInfo() != null) {
// my code here
}
}
};
executorService.submit(c);
}
// is this at right place? because I am getting rejectedexecutionexception
executorService.shutdown();
number++;
break;
} catch (Exception ex) {
// log exception
}
}
} while (number <= pages);
}
最佳答案
您不必从并行代码中输出某些内容。您只需获取外循环的主体并为每个项目创建一个任务,如下所示:
for (Task task : tasks) {
Callable<Void> c = new Callable<>() {
public void call() {
if (task.getCategories().isEmpty() || task.getEventList() == null || task.getMetaInfo() == null) {
// ... rest of code here
}
}
};
executorService.submit(c);
}
// wait for executor service, check for exceptions or whatever else you want to do here
关于java - 并行化 for 循环并填充多个数据结构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48917750/