我注意到,取决于 doSth() 方法的实现(如果线程 hibernate 一段恒定或随机的时间),并行流的执行方式不同。
例子:
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import static java.lang.System.out;
public class AtomicInt {
public static void main(String[] args) throws ExecutionException, InterruptedException {
out.println("Result: " + count());
}
public static int count() throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool(10);
AtomicInteger counter = new AtomicInteger(0);
forkJoinPool.submit(() -> IntStream
.rangeClosed(1, 20)
.parallel()
.map(i -> doSth(counter))
.forEach(i -> out.println(">>>forEach: " + Thread.currentThread().getName() + " value: " + i))
).get();
return counter.get();
}
private static int doSth(AtomicInteger counter) {
try {
out.println(">>doSth1: " + Thread.currentThread().getName());
Thread.sleep(100 + new Random().nextInt(1000));
// Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
int counterValue = counter.incrementAndGet();
out.println(">>doSth2: " + Thread.currentThread().getName() + " value: " + counterValue);
return counterValue;
}
}
每个数字正在按顺序处理:
>>doSth1: ForkJoinPool-1-worker-9
>>doSth1: ForkJoinPool-1-worker-8
>>doSth1: ForkJoinPool-1-worker-2
>>doSth1: ForkJoinPool-1-worker-1
>>doSth1: ForkJoinPool-1-worker-6
>>doSth1: ForkJoinPool-1-worker-11
>>doSth1: ForkJoinPool-1-worker-15
>>doSth1: ForkJoinPool-1-worker-4
>>doSth1: ForkJoinPool-1-worker-13
>>doSth1: ForkJoinPool-1-worker-10
>>doSth2: ForkJoinPool-1-worker-8 value: 1
>>>forEach: ForkJoinPool-1-worker-8 value: 1
>>doSth1: ForkJoinPool-1-worker-8
>>doSth2: ForkJoinPool-1-worker-15 value: 2
>>>forEach: ForkJoinPool-1-worker-15 value: 2
>>doSth1: ForkJoinPool-1-worker-15
>>doSth2: ForkJoinPool-1-worker-11 value: 3
>>>forEach: ForkJoinPool-1-worker-11 value: 3
>>doSth1: ForkJoinPool-1-worker-11
>>doSth2: ForkJoinPool-1-worker-2 value: 4
>>>forEach: ForkJoinPool-1-worker-2 value: 4
>>doSth1: ForkJoinPool-1-worker-2
>>doSth2: ForkJoinPool-1-worker-9 value: 5
>>>forEach: ForkJoinPool-1-worker-9 value: 5
>>doSth1: ForkJoinPool-1-worker-9
>>doSth2: ForkJoinPool-1-worker-11 value: 6
>>>forEach: ForkJoinPool-1-worker-11 value: 6
>>doSth1: ForkJoinPool-1-worker-11
>>doSth2: ForkJoinPool-1-worker-1 value: 7
>>>forEach: ForkJoinPool-1-worker-1 value: 7
>>doSth1: ForkJoinPool-1-worker-1
>>doSth2: ForkJoinPool-1-worker-15 value: 8
>>>forEach: ForkJoinPool-1-worker-15 value: 8
>>doSth1: ForkJoinPool-1-worker-15
>>doSth2: ForkJoinPool-1-worker-8 value: 9
>>>forEach: ForkJoinPool-1-worker-8 value: 9
>>doSth1: ForkJoinPool-1-worker-8
>>doSth2: ForkJoinPool-1-worker-13 value: 10
>>>forEach: ForkJoinPool-1-worker-13 value: 10
>>doSth1: ForkJoinPool-1-worker-13
>>doSth2: ForkJoinPool-1-worker-9 value: 11
>>>forEach: ForkJoinPool-1-worker-9 value: 11
>>doSth2: ForkJoinPool-1-worker-15 value: 12
>>>forEach: ForkJoinPool-1-worker-15 value: 12
>>doSth2: ForkJoinPool-1-worker-10 value: 13
>>>forEach: ForkJoinPool-1-worker-10 value: 13
>>doSth2: ForkJoinPool-1-worker-4 value: 14
>>>forEach: ForkJoinPool-1-worker-4 value: 14
>>doSth2: ForkJoinPool-1-worker-6 value: 15
>>>forEach: ForkJoinPool-1-worker-6 value: 15
>>doSth2: ForkJoinPool-1-worker-11 value: 16
>>>forEach: ForkJoinPool-1-worker-11 value: 16
>>doSth2: ForkJoinPool-1-worker-2 value: 17
>>>forEach: ForkJoinPool-1-worker-2 value: 17
>>doSth2: ForkJoinPool-1-worker-13 value: 18
>>>forEach: ForkJoinPool-1-worker-13 value: 18
>>doSth2: ForkJoinPool-1-worker-1 value: 19
>>>forEach: ForkJoinPool-1-worker-1 value: 19
>>doSth2: ForkJoinPool-1-worker-8 value: 20
>>>forEach: ForkJoinPool-1-worker-8 value: 20
Result: 20
当我将 doSth() 方法更改为始终 hibernate 1 秒而不是随机 hibernate 时,结果将被乱序计算:
>>doSth1: ForkJoinPool-1-worker-6
>>doSth1: ForkJoinPool-1-worker-1
>>doSth1: ForkJoinPool-1-worker-10
>>doSth1: ForkJoinPool-1-worker-2
>>doSth1: ForkJoinPool-1-worker-13
>>doSth1: ForkJoinPool-1-worker-15
>>doSth1: ForkJoinPool-1-worker-8
>>doSth1: ForkJoinPool-1-worker-11
>>doSth1: ForkJoinPool-1-worker-4
>>doSth1: ForkJoinPool-1-worker-9
>>doSth2: ForkJoinPool-1-worker-1 value: 1
>>doSth2: ForkJoinPool-1-worker-10 value: 2
>>doSth2: ForkJoinPool-1-worker-6 value: 3
>>>forEach: ForkJoinPool-1-worker-6 value: 3
>>>forEach: ForkJoinPool-1-worker-10 value: 2
>>>forEach: ForkJoinPool-1-worker-1 value: 1
>>doSth1: ForkJoinPool-1-worker-10
>>doSth1: ForkJoinPool-1-worker-6
>>doSth1: ForkJoinPool-1-worker-1
>>doSth2: ForkJoinPool-1-worker-15 value: 4
>>doSth2: ForkJoinPool-1-worker-9 value: 10
>>doSth2: ForkJoinPool-1-worker-8 value: 7
>>>forEach: ForkJoinPool-1-worker-8 value: 7
>>doSth1: ForkJoinPool-1-worker-8
>>doSth2: ForkJoinPool-1-worker-4 value: 9
>>>forEach: ForkJoinPool-1-worker-4 value: 9
>>doSth2: ForkJoinPool-1-worker-11 value: 8
>>doSth2: ForkJoinPool-1-worker-13 value: 6
>>doSth2: ForkJoinPool-1-worker-2 value: 5
>>>forEach: ForkJoinPool-1-worker-13 value: 6
>>>forEach: ForkJoinPool-1-worker-11 value: 8
>>doSth1: ForkJoinPool-1-worker-4
>>>forEach: ForkJoinPool-1-worker-9 value: 10
>>>forEach: ForkJoinPool-1-worker-15 value: 4
>>doSth1: ForkJoinPool-1-worker-9
>>doSth1: ForkJoinPool-1-worker-11
>>doSth1: ForkJoinPool-1-worker-13
>>>forEach: ForkJoinPool-1-worker-2 value: 5
>>doSth1: ForkJoinPool-1-worker-15
>>doSth1: ForkJoinPool-1-worker-2
>>doSth2: ForkJoinPool-1-worker-10 value: 12
>>>forEach: ForkJoinPool-1-worker-10 value: 12
>>doSth2: ForkJoinPool-1-worker-6 value: 11
>>doSth2: ForkJoinPool-1-worker-1 value: 13
>>>forEach: ForkJoinPool-1-worker-6 value: 11
>>>forEach: ForkJoinPool-1-worker-1 value: 13
>>doSth2: ForkJoinPool-1-worker-9 value: 15
>>doSth2: ForkJoinPool-1-worker-2 value: 20
>>>forEach: ForkJoinPool-1-worker-2 value: 20
>>doSth2: ForkJoinPool-1-worker-15 value: 19
>>>forEach: ForkJoinPool-1-worker-15 value: 19
>>doSth2: ForkJoinPool-1-worker-8 value: 14
>>doSth2: ForkJoinPool-1-worker-11 value: 17
>>doSth2: ForkJoinPool-1-worker-4 value: 16
>>doSth2: ForkJoinPool-1-worker-13 value: 18
>>>forEach: ForkJoinPool-1-worker-4 value: 16
>>>forEach: ForkJoinPool-1-worker-11 value: 17
>>>forEach: ForkJoinPool-1-worker-8 value: 14
>>>forEach: ForkJoinPool-1-worker-9 value: 15
>>>forEach: ForkJoinPool-1-worker-13 value: 18
Result: 20
这是巧合还是对这种行为有解释?
最佳答案
此时,您正在执行 sleep
语句,没有定义的顺序。虽然通过 IntStream.range
创建的流具有定义的遇到顺序,但您通过忽略实际的 int
值将操作变成无序操作.
产生可感知顺序的第一个 Action 是counter.incrementAndGet()
。在此之前,哪个线程到达那个点以及它与哪个流元素相关联并不重要。此时它从 AtomicInteger
中获取它的编号。之后,仅执行两个使用该号码的附加操作,即使用该号码打印两条消息。对于这两个不同的结果,重要的是这三个操作,counter.incrementAndGet()
和打印这两条消息,是否被另一个线程拦截。
我们可以很容易地将这种情况简化为
AtomicInteger counter = new AtomicInteger();
ExecutorService es = Executors.newFixedThreadPool(20);
es.invokeAll(Collections.nCopies(20, () -> {
out.println("1st: " + Thread.currentThread().getName());
Thread.sleep(100 + new Random().nextInt(1000));
// Thread.sleep(1000);
int counterValue = counter.incrementAndGet();
out.println("2nd: " + Thread.currentThread().getName() + " value: " + counterValue);
out.println("3rd: " + Thread.currentThread().getName() + " value: " + counterValue);
return null;
}));
es.shutdown();
请注意,对于 invokeAll
,根本没有定义顺序,但是,如前所述,这无关紧要。任务最迟在调用 incrementAndGet()
时获得分配的序列号。行为与流示例相同。
虽然我总是强调concurrent 并不意味着parallel,但由于未指定的执行时间和线程调度行为,仍然很有可能开始相同的短代码同时在没有后台 Activity 给 CPU 内核带来不可预测的工作负载时真正并行运行。
当所有线程并行运行时,它们同时命中了out.println
的内部同步,只有一个线程可以继续执行,其他线程被放入队列。然后,synchronized
的不公平 性质开始发挥作用。任意线程将获胜,之后,任意线程将被放回调度。这会导致数字以随机顺序打印。
当您让线程 hibernate 一段随机时间时,它们不再完全并行运行,从而增加了在不同时间到达打印语句的机会,从而能够无竞争地执行它们。哪个线程首先到达这一点是随机的,但是由于它们在 sleep 后分配了编号,因此第一个到达这一点的线程将获得编号,依此类推。
关于Java 并行流内部结构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47500239/