java - 如何创建移相器的链/层

原文 标签 java multithreading executorservice phaser

我正在编写使用Phaser知道何时完成工作的多线程应用程序。问题在于,在ExecutorCompletionService中,一个队列中甚至可能有100k线程,但是Phaser中的最大未参与方数是65535。当到达65536方时,我该怎么办?

我的示例代码:

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

public class Main {
public static void main(String[] args) throws Exception {
    ExecutorService ec = Executors.newFixedThreadPool(10);
    ExecutorCompletionService<List<String>> ecs = new ExecutorCompletionService<List<String>>(
            ec);
    Phaser phaser = new Phaser();

    // register first node/thread
    ecs.submit(new SimpleParser("startfile.txt"));
    phaser.register();

    Future<List<String>> future;
    do {
        future = ecs.poll();
        if(future!=null && future.get() != null) {
            addParties(phaser, future.get(), ecs);
            phaser.arriveAndDeregister();
        }

        if (phaser.isTerminated()) {
            ec.shutdown();
        }
    } while (!ec.isShutdown() && !phaser.isTerminated());
}

public static void addParties(Phaser p, List<String> filenames,
        ExecutorCompletionService<List<String>> ecs) {
    for (int i = 0; i < filenames.size(); i++) {
        ecs.submit(new SimpleParser(filenames.get(i)));
        //PROBLEM = What to do when Phaser has 65535+ unarrived parties
        p.register();
    }
}

static class SimpleParser implements Callable<List<String>> {

    String fileName;

    public SimpleParser(String fileName) {
        this.fileName = fileName;
    }

    @Override
    public List<String> call() throws Exception {
        return parseFile();
    }

    private List<String> parseFile() {
        return new ArrayList<String>(Arrays.asList(new String[] {
                "somefilename1.txt", "somefilename2.txt" }));
    }

}
}

问题出在addParties()方法中。单线程(SimpleParser)可以返回100个新文件名,并且将有100个新线程提交给ExecutorCompletionService,并且有100个新方在Phaser中注册。
我试图使用这样的东西:
if(p.getUnarrivedParties() == 65535)
            p = new Phaser(p);

并创建一个相位器链,但这没有帮助,因为p.getUnarrivedParties()返回0,但是我无法为其注册下一个参与者...
    System.out.println(p.getUnarrivedParties());
        if(p.getUnarrivedParties() == 65535) {
            p = new Phaser(p);
            System.out.println(p.getUnarrivedParties());
        }
        p.register();

印刷品:

65535

0

并抛出IllegalStateException

那么,如何创建可以与此旧相连接的新相位器呢?

//编辑

谢谢@bowmore。
我还有两个问题。

让我们看一个例子:
import java.util.concurrent.Phaser;

public class Test2 {
    public static void main(String[] args) {
        Phaser parent = new Phaser();
        Phaser child1 = new Phaser(parent);
        Phaser child2 = new Phaser(parent);
        child1.register();
        child2.register();

        System.out.println("Parent: "+parent.isTerminated());
        System.out.println("Child1: "+child1.isTerminated());
        System.out.println("Child2: "+child1.isTerminated()+"\n");

        child1.arriveAndDeregister();
        System.out.println("Parent: "+parent.isTerminated());
        System.out.println("Child1: "+child1.isTerminated());
        System.out.println("Child2: "+child2.isTerminated()+"\n");

        child2.arriveAndDeregister();
        System.out.println("Parent: "+parent.isTerminated());
        System.out.println("Child1: "+child1.isTerminated());
        System.out.println("Child2: "+child2.isTerminated()+"\n");
    }
}

它打印:
Parent: false
Child1: false
Child2: false

Parent: false
Child1: false
Child2: false

Parent: true
Child1: true
Child2: true

为什么在child1.arriveAndDeregister();之后? child1没有终止,如何检查它是否确实终止?

第二个问题。
我问过要参加65535个聚会之后再创建新的Phaser的原因,因为我认为创建数千个新对象是没有用的-您认为这样做不会有内存问题,或者甚至可能会降低性能吗?

最佳答案

不用向现有的Phaser注册,新进程可以在原始的新创建的子Phaser上注册。只需将父Phaser提供给子代的构造函数,即可创建子代Phaser

public static void addParties(Phaser p, List<String> filenames,
                              ExecutorCompletionService<List<String>> ecs) {
    Phaser newPhaser = new Phaser(p);
    for (int i = 0; i < filenames.size(); i++) {
        ecs.submit(new SimpleParser(filenames.get(i)));
        newPhaser.register();
    }
}

如果您只想在达到特定阈值时创建子相位器,则可以检查已注册方的数量,而不是未到达方的数量:
public static void addParties(Phaser p, List<String> filenames, ExecutorCompletionService<List<String>> ecs) {
    Phaser toRegister = p.getRegisteredParties() > THRESHOLD ? new Phaser(p) : p;
    for (int i = 0; i < filenames.size(); i++) {
        ecs.submit(new SimpleParser(filenames.get(i)));
        //PROBLEM = What to do when Phaser has 65535+ unarrived parties
        toRegister.register();
    }
    System.out.println(p.getRegisteredParties());
}

编辑:

跟踪问题1:子Phaser与根Phaser共享其终止状态,这是isTerminated()的实现
public boolean isTerminated() {
    return root.state < 0L;
}

跟踪问题2:父级相位器实际上并未保留对其子级相位器的引用。一旦不再引用子相位器,它就有资格进行垃圾回收。您最好最好遵循javadoc中的建议:

TASKS_PER_PHASER的最佳值主要取决于预期的同步速率。对于极小的每阶段任务主体(因此是高比率),低至4的值可能合适,而对于极大型的任务阶段,则最高可能为数百。

分层的主要原因是减少繁重的同步争用,因此,如果您的任务轻巧,则每个相位器的任务越少越好。剖析不同的设置来调整这些内容从来没有什么坏处。

相关文章:

java - 通过未修改的原子参考进行的突变可见吗?

multithreading - 如何使用Rayon将大范围划分为多个范围,并使每个线程都在一个范围内找到?

java - 修复并动态调整工作线程数

java - 为什么在多线程环境中用伪记录填充数组列表会花费两倍的时间?

java - “这个”指的是什么?

java - 如何在JSP页面的不同部分中调用Java方法

java - 获取Java中可选类型的列表

java - 上载大文件时,Jetty MultiPartFilter引发SocketTimeoutException

python - 阻止QThread阻止GUI

java - 使用ExecutorService的最有效方法