java - 如何创建相位器链/层

标签 java multithreading executorservice phaser

我正在编写多线程应用程序,它使用 Phaser 来知道何时完成工作。问题是,在 ExecutorCompletionService 中,队列中甚至可以有 100k 的线程,但在 Phaser 中,最大未到达方数是 65535。当到达 65536 方时,我该怎么办?

我的示例代码:

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

public class Main {
public static void main(String[] args) throws Exception {
    ExecutorService ec = Executors.newFixedThreadPool(10);
    ExecutorCompletionService<List<String>> ecs = new ExecutorCompletionService<List<String>>(
            ec);
    Phaser phaser = new Phaser();

    // register first node/thread
    ecs.submit(new SimpleParser("startfile.txt"));
    phaser.register();

    Future<List<String>> future;
    do {
        future = ecs.poll();
        if(future!=null && future.get() != null) {
            addParties(phaser, future.get(), ecs);
            phaser.arriveAndDeregister();
        }

        if (phaser.isTerminated()) {
            ec.shutdown();
        }
    } while (!ec.isShutdown() && !phaser.isTerminated());
}

public static void addParties(Phaser p, List<String> filenames,
        ExecutorCompletionService<List<String>> ecs) {
    for (int i = 0; i < filenames.size(); i++) {
        ecs.submit(new SimpleParser(filenames.get(i)));
        //PROBLEM = What to do when Phaser has 65535+ unarrived parties
        p.register();
    }
}

static class SimpleParser implements Callable<List<String>> {

    String fileName;

    public SimpleParser(String fileName) {
        this.fileName = fileName;
    }

    @Override
    public List<String> call() throws Exception {
        return parseFile();
    }

    private List<String> parseFile() {
        return new ArrayList<String>(Arrays.asList(new String[] {
                "somefilename1.txt", "somefilename2.txt" }));
    }

}
}

问题出在 addParties() 方法中。单线程(SimpleParser)可以返回即100个新文件名,将有100个新线程提交给ExecutorCompletionService,100个新方在Phaser中注册。 我试过使用这样的东西:

if(p.getUnarrivedParties() == 65535)
            p = new Phaser(p);

并创建一个移相器链,但它没有帮助,因为 p.getUnarrivedParties() 返回 0,但我无法注册下一个派对...

    System.out.println(p.getUnarrivedParties());
        if(p.getUnarrivedParties() == 65535) {
            p = new Phaser(p);
            System.out.println(p.getUnarrivedParties());
        }
        p.register();

打印:

65535

0

并抛出 IllegalStateException

那么我怎样才能创建与旧 Phaser 连接的新 Phaser?

//编辑

谢谢@bowmore。 我还有两个问题。

让我们看例子:

import java.util.concurrent.Phaser;

public class Test2 {
    public static void main(String[] args) {
        Phaser parent = new Phaser();
        Phaser child1 = new Phaser(parent);
        Phaser child2 = new Phaser(parent);
        child1.register();
        child2.register();

        System.out.println("Parent: "+parent.isTerminated());
        System.out.println("Child1: "+child1.isTerminated());
        System.out.println("Child2: "+child1.isTerminated()+"\n");

        child1.arriveAndDeregister();
        System.out.println("Parent: "+parent.isTerminated());
        System.out.println("Child1: "+child1.isTerminated());
        System.out.println("Child2: "+child2.isTerminated()+"\n");

        child2.arriveAndDeregister();
        System.out.println("Parent: "+parent.isTerminated());
        System.out.println("Child1: "+child1.isTerminated());
        System.out.println("Child2: "+child2.isTerminated()+"\n");
    }
}

它打印:

Parent: false
Child1: false
Child2: false

Parent: false
Child1: false
Child2: false

Parent: true
Child1: true
Child2: true

为什么在 child1.arriveAndDeregister() 之后; child1 没有终止,如何检查它是否真的终止了?

第二个问题。 我询问了在达到 65535 个派对后创建新 Phaser 的问题,因为我认为创建数千个新对象是没有用的 - 你认为这样做不会有内存问题,或者它甚至可以提高性能吗?

最佳答案

新进程可以在原始的新创建的子 Phaser 上注册,而不是在现有的 Phaser 上注册。只需将父级 Phaser 提供给子级的构造函数,即可创建子级 Phaser

public static void addParties(Phaser p, List<String> filenames,
                              ExecutorCompletionService<List<String>> ecs) {
    Phaser newPhaser = new Phaser(p);
    for (int i = 0; i < filenames.size(); i++) {
        ecs.submit(new SimpleParser(filenames.get(i)));
        newPhaser.register();
    }
}

如果你只想在达到某个阈值时创建子Phasers,你可以检查注册方的数量,而不是未到达的数量:

public static void addParties(Phaser p, List<String> filenames, ExecutorCompletionService<List<String>> ecs) {
    Phaser toRegister = p.getRegisteredParties() > THRESHOLD ? new Phaser(p) : p;
    for (int i = 0; i < filenames.size(); i++) {
        ecs.submit(new SimpleParser(filenames.get(i)));
        //PROBLEM = What to do when Phaser has 65535+ unarrived parties
        toRegister.register();
    }
    System.out.println(p.getRegisteredParties());
}

编辑:

跟进问题 1:子 Phaser 与根 Phaser 共享它们的终止状态,这里是 isTerminated() 的实现/p>

public boolean isTerminated() {
    return root.state < 0L;
}

跟进问题 2:父移相器实际上并不保留对其子移相器的引用。一旦不再引用子移相器,它就可以进行垃圾回收。您最好遵循 javadoc 中的建议:

The best value of TASKS_PER_PHASER depends mainly on expected synchronization rates. A value as low as four may be appropriate for extremely small per-phase task bodies (thus high rates), or up to hundreds for extremely large ones.

分层的主要原因是减少严重的同步争用,因此如果您有轻量级任务,每个移相器的任务越少越好。分析不同的设置来调整这些东西永远不会有坏处。

关于java - 如何创建相位器链/层,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14093200/

相关文章:

java - 无法在数组或列表中使用对象

java - ExecutorService 似乎在 UI 上运行线程?

java - cachedThreadPool 没有按我的预期工作

java - 超时的完整 future 不起作用

java - 使用 Hadoop 自定义数据类型时出现 EOF 异常

Java 使用 ProcessBuilder 以静默方式运行程序

java代码错误

Java:测试服务的并发性

multithreading - 如何处理消息的并行处理?

c# - 安全限制并行、Foreach 循环中的类调用数量