java - 用于并行化方法的执行器任务

标签 java multithreading future executorservice callable

所以我试图从磁盘上删除listDir中列出的n个文件,因此我将listDir分成4部分并让它从磁盘中并行删除。这里的目的是并行执行,以便使其快速而不是顺序执行。 deleteObject(x,credential,token) 可以假设为一个 API,它最终从磁盘中删除一个对象,并且是一个原子操作。成功删除时返回 true,否则返回 false

我有几个问题

  1. 因为我有 4 个并行方法,我正在通过 invokeAll 执行这些方法 Executors.newFixedThreadPool(4) 声明了 4 个线程 总会有 1 个线程分配给 1 个方法吗?
  2. 我是否需要在parallelDeleteOperation()方法的for循环中同步并使用迭代器“i”的 volatile 。我问这个问题的原因是假设第一个线程尚未完成其任务(删除 listDir1 并且 for 循环尚未完成),并假设在中途它进行了上下文切换并且第二个线程开始执行相同的任务(删除 listDir1)。只是想知道在这种情况下第二个线程是否可以获得 IndexOutOfBound 异常。
  3. 将列表分为 4 部分并执行它比让多个线程在非常大的列表上执行删除操作有什么优势吗?
  4. 如果 ExecutorService 的其中一项操作返回 false,则整个 deleteMain() API 将返回 false

     private boolean deleteMain(String parent, List<Structure>
        listDir, String place, String node, Sequence<String>
                                   groups, String Uid) throws IOException {
    
    int noCores = Runtime.getRuntime().availableProcessors();
    List<List<Integer>> splittedList = splitList(listDir, noCores);
    
    System.out.println(splittedList.size());
    
    System.out.println("NoOfCores" + noCores);
    Set<Callable<Boolean>> callables = new HashSet<Callable<Boolean>>();
    for (int i = 0; i < splittedList.size(); i++) {
        List<Integer> l = splittedList.get(i);
        callables.add(new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
    
                return parallelDeleteOperation(parent, listDir, place, node, groups Uid);
            }
        });
    }
    
    ExecutorService service = Executors.newFixedThreadPool(noCores);
    
    try {
        List<Future<Boolean>> futures = service.invokeAll(callables);
    
    
        for (Future<Boolean> future : futures) {
            if (future.get() != true)
                return future.get();
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
    service.shutdown();
    return true;
    }
    
    private Boolean parallelDeleteOperation(String parent, List<Structure>
        listDir, String place, String node, Sequence<String>
                                                groups, String Uid) throws IOException {
    for (int i = 0; i < listDir.size(); i++) {
        final String name = listDir.get(i).filename;
    
        final String filePath = "/" + (parent.isEmpty() ? "" : (parent +
                "/")) + name;
        final DeleteMessage message = new DeleteMessage(name, place, node
                filePath);
        final boolean Status = delete(message, groups, Uid, place);
        if (Status != true)
            return Status;
    }
    return true;
    }
    

最佳答案

  1. So as I have 4 parallel method which I am executing though invokeAll and there are 4 threads declared by Executors.newFixedThreadPool(4) would there always be 1 thread be assigned to 1 method?

通常情况下,任务数量等于池中的线程数量就是正确的,但不能保证。这很大程度上取决于池中的任务。

        Executors.newFixedThreadPool(4).invokeAll(IntStream.range(0, 8).mapToObj(i -> (Callable<Integer>) () -> {
            System.out.println(Thread.currentThread().getName() + ": " + i);
            return 0;
        }).collect(Collectors.toList()));

如果池中有更多任务,如上所述,输出可能如下。没有明显的可预测规则来处理任务:

pool-1-thread-1: 0
pool-1-thread-2: 1
pool-1-thread-3: 2
pool-1-thread-1: 4
pool-1-thread-1: 5
pool-1-thread-1: 6
pool-1-thread-1: 7
pool-1-thread-4: 3
  1. Do I need to synchronize and use volatile for iterator 'i' in the for loop of parallelDeleteOperation() method.

不,你不需要。您已经将原始列表分成单独的四个列表

在您的代码中:

final List listDir1 = listDir.subList(0, listDir.size() / 4);

至于你的第三个问题:

  1. Is there any advantage of dividing list in 4 parts and executing this rather than having multiple threads executing delete operation on a very big list.

你最好在实际条件下进行一些测试。这太复杂了,不能简单地说它更好或不好。

当您在一个大列表上删除时,竞争条件可能比事先拆分列表更严重,这可能会产生额外的开销。

此外,即使没有任何并行性,其性能也可能不错。当涉及多用户系统时,由于线程上下文切换开销,并行版本可能比顺序版本更糟糕。

您必须测试它们,测试助手代码可以是:

    Long start = 0L;
    List<Long> list = new ArrayList<>();
    for (int i = 0; i < 1_000; ++i) {
        start = System.nanoTime();
        // your method to be tested;
        list.add(System.nanoTime() - start);
    }
    System.out.println("Time cost summary: " + list.stream().collect(Collectors.summarizingLong(Long::valueOf)));
  1. If one of the operation of ExecutorService returns false then on the whole deleteMain() API would return false

我想重构您的代码,因为这样可以使其更清晰,并满足您的最后一个要求(第 4 条):

// using the core as the count;
// since your task is CPU-bound, we can directly use parallelStream;
private static void testThreadPool(List<Integer> listDir) {
    // if one of the tasks failed, you got isFailed == true;
    boolean isFailed = splitList(listDir, Runtime.getRuntime().availableProcessors()).stream()
            .parallel().map(YourClass::parallelDeleteOperation).anyMatch(ret -> ret == false); // if any is false, it gives you false
}


// split up the list into "count" lists;
private static <T> List<List<T>> splitList(List<T> list, int count) {
    List<List<T>> listList = new ArrayList<>();
    for (int i = 0, blockSize = list.size() / count; i < count; ++i) {
        listList.add(list.subList(i * blockSize, Math.min((i+1) * blockSize, list.size()));
    }
    return listList;
}

关于java - 用于并行化方法的执行器任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51977247/

相关文章:

java - 将网页另存为图像

java - 这个循环怎么能退出呢?

java - 为什么 Java 中的日期操作要以毫秒为单位?

java - 按功能隐藏 Windows 离开/取消弹出窗口

java - postDelayed 是否导致消息跳到队列的前面?

java - 使用多线程递增/递减/打印(代码审查)

multithreading - for 循环中的 PhantomJS Node

scala - kotlin 中是否有 Future.sequence 模拟?

asynchronous - Flutter Future<dynamic> 与 Future<String> 子类型错误?

flutter - Dart:使用 Stream 而不是 Future 进行 API 调用