Java 并行流内部结构

标签 java multithreading parallel-processing java-8 java-stream

我注意到,取决于 doSth() 方法的实现(如果线程 hibernate 一段恒定或随机的时间),并行流的执行方式不同。

例子:

import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import static java.lang.System.out;

public class AtomicInt {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        out.println("Result: " + count());
    }

    public static int count() throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool(10);

        AtomicInteger counter = new AtomicInteger(0);

        forkJoinPool.submit(() -> IntStream
                .rangeClosed(1, 20)
                .parallel()
                .map(i -> doSth(counter))
                .forEach(i -> out.println(">>>forEach: " + Thread.currentThread().getName() + " value: " + i))
        ).get();

        return counter.get();
    }

    private static int doSth(AtomicInteger counter) {
        try {
            out.println(">>doSth1: " + Thread.currentThread().getName());
            Thread.sleep(100 + new Random().nextInt(1000));
//            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        int counterValue = counter.incrementAndGet();
        out.println(">>doSth2: " + Thread.currentThread().getName() + " value: " + counterValue);

        return counterValue;
    }
}

每个数字正在按顺序处理:

>>doSth1: ForkJoinPool-1-worker-9
>>doSth1: ForkJoinPool-1-worker-8
>>doSth1: ForkJoinPool-1-worker-2
>>doSth1: ForkJoinPool-1-worker-1
>>doSth1: ForkJoinPool-1-worker-6
>>doSth1: ForkJoinPool-1-worker-11
>>doSth1: ForkJoinPool-1-worker-15
>>doSth1: ForkJoinPool-1-worker-4
>>doSth1: ForkJoinPool-1-worker-13
>>doSth1: ForkJoinPool-1-worker-10
>>doSth2: ForkJoinPool-1-worker-8 value: 1
>>>forEach: ForkJoinPool-1-worker-8 value: 1
>>doSth1: ForkJoinPool-1-worker-8
>>doSth2: ForkJoinPool-1-worker-15 value: 2
>>>forEach: ForkJoinPool-1-worker-15 value: 2
>>doSth1: ForkJoinPool-1-worker-15
>>doSth2: ForkJoinPool-1-worker-11 value: 3
>>>forEach: ForkJoinPool-1-worker-11 value: 3
>>doSth1: ForkJoinPool-1-worker-11
>>doSth2: ForkJoinPool-1-worker-2 value: 4
>>>forEach: ForkJoinPool-1-worker-2 value: 4
>>doSth1: ForkJoinPool-1-worker-2
>>doSth2: ForkJoinPool-1-worker-9 value: 5
>>>forEach: ForkJoinPool-1-worker-9 value: 5
>>doSth1: ForkJoinPool-1-worker-9
>>doSth2: ForkJoinPool-1-worker-11 value: 6
>>>forEach: ForkJoinPool-1-worker-11 value: 6
>>doSth1: ForkJoinPool-1-worker-11
>>doSth2: ForkJoinPool-1-worker-1 value: 7
>>>forEach: ForkJoinPool-1-worker-1 value: 7
>>doSth1: ForkJoinPool-1-worker-1
>>doSth2: ForkJoinPool-1-worker-15 value: 8
>>>forEach: ForkJoinPool-1-worker-15 value: 8
>>doSth1: ForkJoinPool-1-worker-15
>>doSth2: ForkJoinPool-1-worker-8 value: 9
>>>forEach: ForkJoinPool-1-worker-8 value: 9
>>doSth1: ForkJoinPool-1-worker-8
>>doSth2: ForkJoinPool-1-worker-13 value: 10
>>>forEach: ForkJoinPool-1-worker-13 value: 10
>>doSth1: ForkJoinPool-1-worker-13
>>doSth2: ForkJoinPool-1-worker-9 value: 11
>>>forEach: ForkJoinPool-1-worker-9 value: 11
>>doSth2: ForkJoinPool-1-worker-15 value: 12
>>>forEach: ForkJoinPool-1-worker-15 value: 12
>>doSth2: ForkJoinPool-1-worker-10 value: 13
>>>forEach: ForkJoinPool-1-worker-10 value: 13
>>doSth2: ForkJoinPool-1-worker-4 value: 14
>>>forEach: ForkJoinPool-1-worker-4 value: 14
>>doSth2: ForkJoinPool-1-worker-6 value: 15
>>>forEach: ForkJoinPool-1-worker-6 value: 15
>>doSth2: ForkJoinPool-1-worker-11 value: 16
>>>forEach: ForkJoinPool-1-worker-11 value: 16
>>doSth2: ForkJoinPool-1-worker-2 value: 17
>>>forEach: ForkJoinPool-1-worker-2 value: 17
>>doSth2: ForkJoinPool-1-worker-13 value: 18
>>>forEach: ForkJoinPool-1-worker-13 value: 18
>>doSth2: ForkJoinPool-1-worker-1 value: 19
>>>forEach: ForkJoinPool-1-worker-1 value: 19
>>doSth2: ForkJoinPool-1-worker-8 value: 20
>>>forEach: ForkJoinPool-1-worker-8 value: 20
Result: 20

当我将 doSth() 方法更改为始终 hibernate 1 秒而不是随机 hibernate 时,结果将被乱序计算:

>>doSth1: ForkJoinPool-1-worker-6
>>doSth1: ForkJoinPool-1-worker-1
>>doSth1: ForkJoinPool-1-worker-10
>>doSth1: ForkJoinPool-1-worker-2
>>doSth1: ForkJoinPool-1-worker-13
>>doSth1: ForkJoinPool-1-worker-15
>>doSth1: ForkJoinPool-1-worker-8
>>doSth1: ForkJoinPool-1-worker-11
>>doSth1: ForkJoinPool-1-worker-4
>>doSth1: ForkJoinPool-1-worker-9
>>doSth2: ForkJoinPool-1-worker-1 value: 1
>>doSth2: ForkJoinPool-1-worker-10 value: 2
>>doSth2: ForkJoinPool-1-worker-6 value: 3
>>>forEach: ForkJoinPool-1-worker-6 value: 3
>>>forEach: ForkJoinPool-1-worker-10 value: 2
>>>forEach: ForkJoinPool-1-worker-1 value: 1
>>doSth1: ForkJoinPool-1-worker-10
>>doSth1: ForkJoinPool-1-worker-6
>>doSth1: ForkJoinPool-1-worker-1
>>doSth2: ForkJoinPool-1-worker-15 value: 4
>>doSth2: ForkJoinPool-1-worker-9 value: 10
>>doSth2: ForkJoinPool-1-worker-8 value: 7
>>>forEach: ForkJoinPool-1-worker-8 value: 7
>>doSth1: ForkJoinPool-1-worker-8
>>doSth2: ForkJoinPool-1-worker-4 value: 9
>>>forEach: ForkJoinPool-1-worker-4 value: 9
>>doSth2: ForkJoinPool-1-worker-11 value: 8
>>doSth2: ForkJoinPool-1-worker-13 value: 6
>>doSth2: ForkJoinPool-1-worker-2 value: 5
>>>forEach: ForkJoinPool-1-worker-13 value: 6
>>>forEach: ForkJoinPool-1-worker-11 value: 8
>>doSth1: ForkJoinPool-1-worker-4
>>>forEach: ForkJoinPool-1-worker-9 value: 10
>>>forEach: ForkJoinPool-1-worker-15 value: 4
>>doSth1: ForkJoinPool-1-worker-9
>>doSth1: ForkJoinPool-1-worker-11
>>doSth1: ForkJoinPool-1-worker-13
>>>forEach: ForkJoinPool-1-worker-2 value: 5
>>doSth1: ForkJoinPool-1-worker-15
>>doSth1: ForkJoinPool-1-worker-2
>>doSth2: ForkJoinPool-1-worker-10 value: 12
>>>forEach: ForkJoinPool-1-worker-10 value: 12
>>doSth2: ForkJoinPool-1-worker-6 value: 11
>>doSth2: ForkJoinPool-1-worker-1 value: 13
>>>forEach: ForkJoinPool-1-worker-6 value: 11
>>>forEach: ForkJoinPool-1-worker-1 value: 13
>>doSth2: ForkJoinPool-1-worker-9 value: 15
>>doSth2: ForkJoinPool-1-worker-2 value: 20
>>>forEach: ForkJoinPool-1-worker-2 value: 20
>>doSth2: ForkJoinPool-1-worker-15 value: 19
>>>forEach: ForkJoinPool-1-worker-15 value: 19
>>doSth2: ForkJoinPool-1-worker-8 value: 14
>>doSth2: ForkJoinPool-1-worker-11 value: 17
>>doSth2: ForkJoinPool-1-worker-4 value: 16
>>doSth2: ForkJoinPool-1-worker-13 value: 18
>>>forEach: ForkJoinPool-1-worker-4 value: 16
>>>forEach: ForkJoinPool-1-worker-11 value: 17
>>>forEach: ForkJoinPool-1-worker-8 value: 14
>>>forEach: ForkJoinPool-1-worker-9 value: 15
>>>forEach: ForkJoinPool-1-worker-13 value: 18
Result: 20

这是巧合还是对这种行为有解释?

最佳答案

此时,您正在执行 sleep 语句,没有定义的顺序。虽然通过 IntStream.range 创建的流具有定义的遇到顺序,但您通过忽略实际的 int 值将操作变成无序操作.

产生可感知顺序的第一个 Action 是counter.incrementAndGet()。在此之前,哪个线程到达那个点以及它与哪个流元素相关联并不重要。此时它从 AtomicInteger 中获取它的编号。之后,仅执行两个使用该号码的附加操作,即使用该号码打印两条消息。对于这两个不同的结果,重要的是这三个操作,counter.incrementAndGet() 和打印这两条消息,是否被另一个线程拦截。

我们可以很容易地将这种情况简化为

AtomicInteger counter = new AtomicInteger();
ExecutorService es = Executors.newFixedThreadPool(20);
es.invokeAll(Collections.nCopies(20, () -> {
    out.println("1st: " + Thread.currentThread().getName());
    Thread.sleep(100 + new Random().nextInt(1000));
//    Thread.sleep(1000);
    int counterValue = counter.incrementAndGet();
    out.println("2nd: " + Thread.currentThread().getName() + " value: " + counterValue);
    out.println("3rd: " + Thread.currentThread().getName() + " value: " + counterValue);
    return null;
}));
es.shutdown();

请注意,对于 invokeAll,根本没有定义顺序,但是,如前所述,这无关紧要。任务最迟在调用 incrementAndGet() 时获得分配的序列号。行为与流示例相同。


虽然我总是强调concurrent 并不意味着parallel,但由于未指定的执行时间和线程调度行为,仍然很有可能开始相同的短代码同时在没有后台 Activity 给 CPU 内核带来不可预测的工作负载时真正并行运行。

当所有线程并行运行时,它们同时命中了out.println的内部同步,只有一个线程可以继续执行,其他线程被放入队列。然后,synchronized不公平 性质开始发挥作用。任意线程将获胜,之后,任意线程将被放回调度。这会导致数字以随机顺序打印。

当您让线程 hibernate 一段随机时间时,它们不再完全并行运行,从而增加了在不同时间到达打印语句的机会,从而能够无竞争地执行它们。哪个线程首先到达这一点是随机的,但是由于它们在 sleep 后分配了编号,因此第一个到达这一点的线程将获得编号,依此类推。

关于Java 并行流内部结构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47500239/

相关文章:

java - If 语句在 NetBeans 中无法正常运行

c - 这个线程上下文结构有什么作用?它的目的是什么?

java - 一些线程卡在 semaphore.aquire() (threads/semaphore/countdownlatch)

c++ - Boost 条件变量等效于 CRITICAL_SECTION

c++ - 在 C++ 中并行加载 128 个文件

java - 该方法必须返回int类型的结果,java

java - JAXB 与 Apache XMLBeans

java - 某些情况下使用Factory或Builder模式是必要的

c# - 为什么并行多线程代码执行比顺序执行慢?

c++ - 在 C++ AMP 限制(direct3d)代码中使用 double 预期的 int 索引