Java 8 ForkJoin 无法解释的输出

标签 java multithreading concurrency java-8 fork-join

我正在使用 ForkJoin 框架和 Java 8 accumulateAndGet 函数。以下程序产生了我无法解释的输出。可以吗?

P.S:这不是作业。这是来自 Cay S. Horstmann 的书“Java SE 8 for the Really Impatient”中的一个练习。这是一本好书。

/**
* Q1: Write a program that keeps track of the longest string that is observed by a number of threads. Use an
* {@code AtomicReference} and an appropriate accumulator.
* 
* @param observed
*            Longest string.
* @param x
*            String value to be compared to the longest value.
* @return Longest string.
*/
public static String updateLongestString(final AtomicReference<String> observed, final String x) {
    LOGGER.info("Received observed: {}, x: {}", observed, x);

    final String longestString = observed.accumulateAndGet(x,
        maxBy((str1, str2) -> observed.get().length() - x.length()));

    LOGGER.info("New observed: {}.", longestString);

    return longestString;
}

单元测试:

@Test
public void testUpdateLongestString() {
    final String[] words = new String[] { "Java 8", "Java 8 is Awesome!",
                "Java 8 is the Best thing Since Sliced Bread!", "Java 8 Changes Everything!" };
    final int len = words.length;
    final int stopAfter = 100;

    final AtomicReference<String> longestString = new AtomicReference<>(words[0]);
    final AtomicInteger count = new AtomicInteger(1);

    class UpdateLongestStringTask extends RecursiveAction {
        private static final long serialVersionUID = -2288401002001447054L;

        private int id = -1;

        private UpdateLongestStringTask(final int id) {
            this.id = id;
        }

        @Override
        protected void compute() {
            LOGGER.info("Executing task #: {}.", id);

            if (count.get() >= stopAfter) {
                return;
            }

            final ForkJoinTask<Void> task = new UpdateLongestStringTask(count.incrementAndGet()).fork();

            updateLongestString(longestString, words[randomIndex()]);

            task.join();
        }

        private int randomIndex() {             
            return ThreadLocalRandom.current().nextInt(len);
        }
    }

    /* Just because we can. */
    final int parallelism = min(getRuntime().availableProcessors(), 4);

    new ForkJoinPool(parallelism).invoke(new UpdateLongestStringTask(count.get()));
}

输出(标记为 -->> 的行无法解释;它为什么打印一个它显然从未收到过的值):

2015-01-05 23:20:00.974 [ForkJoinPool-1-worker-1] [INFO ] n.a.j.j.c.PracticeQuestionsCh6Test - Executing task #: 1.
2015-01-05 23:20:00.980 [ForkJoinPool-1-worker-2] [INFO ] n.a.j.j.c.PracticeQuestionsCh6Test - Executing task #: 2.
2015-01-05 23:20:00.980 [ForkJoinPool-1-worker-3] [INFO ] n.a.j.j.c.PracticeQuestionsCh6Test - Executing task #: 3.
2015-01-05 23:20:00.980 [ForkJoinPool-1-worker-0] [INFO ] n.a.j.j.c.PracticeQuestionsCh6Test - Executing task #: 4.
2015-01-05 23:20:00.981 [ForkJoinPool-1-worker-0] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - Received observed: Java 8, x: Java 8 Changes Everything!
2015-01-05 23:20:00.980 [ForkJoinPool-1-worker-3] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - Received observed: Java 8, x: Java 8
2015-01-05 23:20:00.980 [ForkJoinPool-1-worker-1] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - Received observed: Java 8, x: Java 8 is Awesome!
2015-01-05 23:20:00.980 [ForkJoinPool-1-worker-2] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - Received observed: Java 8, x: Java 8 is the Best thing Since Sliced Bread!
2015-01-05 23:20:01.028 [ForkJoinPool-1-worker-0] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - New observed: Java 8 Changes Everything!.

-->> 2015-01-05 23:20:01.028 [ForkJoinPool-1-worker-3] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - New observed: Java 8 Changes Everything!.

2015-01-05 23:20:01.028 [ForkJoinPool-1-worker-1] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - New observed: Java 8 is Awesome!.

最佳答案

如果您提供一个函数,该函数应该对其参数进行操作,即

final String longestString = observed.accumulateAndGet(x,
    maxBy((str1, str2) -> observed.get().length() - x.length()));

严重违反了函数式 API 的约定。 accumulateAndGet 将提供有关指定操作的原子更新,但不会提供有关函数内另一个 get 操作的原子更新。

不清楚您为什么这样做,因为实现正确的功能是直截了当的:

final String longestString = observed.accumulateAndGet(x,
    maxBy((str1, str2) -> str1.length() - str2.length()));

final String longestString =
    observed.accumulateAndGet(x, maxBy(comparingInt(String::length)));

请注意,在修复代码后,您仍然可能观察到与之前记录的两个值不同的结果,因为 accumulateAndGet 为您提供原子更新,但这种原子性不会扩展到您执行的记录操作 < em>在 调用accumulateAndGet 之前。 AtomicReference 的记录内容在执行原子更新时可能已过时。但由于更新契约(Contract)和您提供的运算符,生成的 String 的大小至少与您之前看到的值的最大值相同。

此外,请记住,感知到的日志操作顺序并不能说明执行更新操作的实际顺序。

您可以更好地了解更改代码时发生的情况,如下所示:

public static String updateLongestString(AtomicReference<String> observed, String x) {

    final String longestString = observed.accumulateAndGet(x, maxBy((str1, str2) -> {
        LOGGER.info("Received str1: {}, str2: {}", str1, str2);
        return str1.length() - str2.length();
    }));

    LOGGER.info("New observed: {}.", longestString);

    return longestString;
}

关于Java 8 ForkJoin 无法解释的输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27791975/

相关文章:

javascript - 为什么 Promise-Limit 在 Nodejs 中不起作用?

Cuda内核没有并发运行

java - 无法在多阶段 docker 构建中从一个阶段复制到另一个阶段

java - 使用类重新排列代码?

java - JOGL 校正球体中的光线和反射

c# - 通过单独的线程在表单上绘制

android - 从服务中的线程更改主 Activity 用户界面

c++ - 我在哪里使用 _endthreadex - Windows API

c# - 如何管理在 C# 中插入的并发性?

java - JSCH依次执行并打印命令输出