java-8 - 通过示例了解 Java 中的移相器

标签 java-8 phaser

我想了解 Java 中的 Phaser。我写了一个提前卡住等待其他方到达的例子。

据我了解,phaser 被用作可重用的线程同步(不像 CountdownLatch 不可重用)具有屏障操作的屏障(不同于用于共享状态的 Cyclicbarrier,Phaser 不必在屏障中共享状态行动)。如果我错了,请纠正我。

因此,在我的示例中,我试图在一定数量的参与方/线程达到障碍后在每个线程中执行一些随机加法和减法代码。我做错了什么?

import static java.lang.String.*;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.stream.IntStream;

public class PhaserUsage implements Callable<String> {

    private static final int THREAD_POOL_SIZE = 10;
    private final Phaser phaser;

    private PhaserUsage(Phaser phaser) {
        this.phaser = phaser;
    }

    public static void main(String a[]) {
        ExecutorService execService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
        CompletionService<String> completionService = new ExecutorCompletionService<>(execService);

        Phaser phaser = new Phaser(1);
        IntStream.range(0, THREAD_POOL_SIZE)
                .forEach(nbr -> completionService.submit(new PhaserUsage(phaser)));

        execService.shutdown();

         try {
             while (!execService.isTerminated()) {
                String result = completionService.take().get();
                System.out.println(format("Result is: %s", result));
             }
          } catch (ExecutionException | InterruptedException e) {
             e.printStackTrace();
          }
    }

    @Override
    public String call() {
        String threadName = Thread.currentThread().getName();
        System.out.println(format("Registering...%s",threadName));
        phaser.register();
        System.out.println(format("Arrive and await advance...%s",threadName));
        phaser.arriveAndAwaitAdvance(); // await all creation
        int a = 0, b = 1;
        Random random = new Random();
        for (int i = 0; i < random.nextInt(10000000); i++) {
            a = a + b;
            b = a - b;
        }
        System.out.println(format("De-registering...%s",threadName));
        phaser.arriveAndDeregister();
        return format("Thread %s results: a = %s, b = %s", threadName, a, b);
    }
}

最佳答案

问题是您无法从正在注册的任务中调用phaser.register()。使用移相器时,请始终遵循以下两条规则:

  1. 只有已注册的任务才能注册其他任务。这意味着任务无法注册自己。
  2. 所有已注册的任务必须在结束前取消注册。一个好的做法是使用移相器将代码包装在 finally block 周围,该 block 在最后取消注册(参见示例)。

这是您的固定程序(注意创建移相器的行):

import static java.lang.String.*;

import java.util.Random;
import java.util.concurrent.*;
import java.util.stream.IntStream;

public class PhaserUsage implements Callable<String> {

    private static final int THREAD_POOL_SIZE = 10;
    private final Phaser phaser;

    private PhaserUsage(Phaser phaser) {
        this.phaser = phaser;
    }

    public static void main(String a[]) {
        ExecutorService execService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
        CompletionService<String> completionService = new ExecutorCompletionService<>(execService);

        // since we know beforehand how many tasks we have, initialize the
        // number of participants in the constructor; other wise register
        // *before* launching the task
        Phaser phaser = new Phaser(THREAD_POOL_SIZE);

        IntStream.range(0, THREAD_POOL_SIZE)
                .forEach(nbr -> completionService.submit(new PhaserUsage(phaser)));

        execService.shutdown();

         try {
             while (!execService.isTerminated()) {
                String result = completionService.take().get();
                System.out.println(format("Result is: %s", result));
             }
          } catch (ExecutionException | InterruptedException e) {
             e.printStackTrace();
          }
    }

    @Override
    public String call() {
        String threadName = Thread.currentThread().getName();
        System.out.println(format("Arrive and await advance...%s",threadName));
        phaser.arriveAndAwaitAdvance(); // await all creation
        int a = 0, b = 1;
        Random random = new Random();
        for (int i = 0; i < random.nextInt(10000000); i++) {
            a = a + b;
            b = a - b;
        }
        System.out.println(format("De-registering...%s",threadName));
        phaser.arriveAndDeregister();
        return format("Thread %s results: a = %s, b = %s", threadName, a, b);
    }
}

关于java-8 - 通过示例了解 Java 中的移相器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31956018/

相关文章:

java - String.valueOf(int) 中的 ArrayOutOfBoundsException 怎么可能发生?

java - 使用和重用 Phaser 而不是 join()

java - 由于限制,灵活的 CountDownLatch 无法使用 Phaser

java - 将线程注册到 Phaser

Java8 : Is there a way to get an instance method reference from a class method reference?

java - 流的收集函数中组合器有什么用?

java - java中哪里使用屏障模式?

java - Phaser 类别和分层

java - 序列化兼容性 Java 7/8

java - 为什么 Apache Commons VFS 考虑 Http Proxy 和 Socks5 Proxy 而忽略 Socks4 Proxy?