java - 一旦发现 findAny 匹配,如何停止并行流?

标签 java parallel-processing java-stream completable-future

我正在尝试查找列表中与给定谓词匹配的第一个(任何)成员,如下所示:

Item item = items.parallelStream()
  .map(i -> i.doSomethingExpensive())
  .filter(predicate)
  .findAny()
  .orElse(null);

我希望一旦 findAny() 获得匹配,它会立即返回,但情况似乎并非如此。相反,它似乎等待 map 方法完成大多数元素后再返回。如何立即返回第一个结果并取消其他并行流?有没有比使用诸如 CompletableFuture 之类的流更好的方法来做到这一点?

这是一个显示行为的简单示例:

private static void log(String msg) {
    private static void log(String msg) {
    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
    System.out.println(sdf.format(new Date()) + " " + msg);
}

Random random = new Random();
List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
Optional<Integer> num = nums.parallelStream()
  .map(n -> {
    long delay = Math.abs(random.nextLong()) % 10000;
    log("Waiting on " + n + " for " + delay + " ms");
    try { Thread.sleep(delay); }
    catch (InterruptedException e) { System.err.println("Interruption error"); }
    return n * n;
  })
  .filter(n -> n < 30)
  .peek(n -> log("Found match: " + n))
  .findAny();

log("First match: " + num);

日志输出:

14:52:27.061 Waiting on 9 for 2271 ms
14:52:27.061 Waiting on 2 for 1124 ms
14:52:27.061 Waiting on 13 for 547 ms
14:52:27.061 Waiting on 4 for 517 ms
14:52:27.061 Waiting on 1 for 1210 ms
14:52:27.061 Waiting on 6 for 2646 ms
14:52:27.061 Waiting on 0 for 4393 ms
14:52:27.061 Waiting on 12 for 5520 ms
14:52:27.581 Found match: 16
14:52:27.582 Waiting on 3 for 5365 ms
14:52:28.188 Found match: 4
14:52:28.275 Found match: 1
14:52:31.457 Found match: 0
14:52:32.950 Found match: 9
14:52:32.951 First match: Optional[0]

一旦找到匹配项(在本例中为 16),findAny() 不会立即返回,而是阻塞直到剩余线程完成。在这种情况下,调用者在找到匹配项后返回之前会额外等待 5 秒。

最佳答案

Instead it seems to wait for the map method to finish on most of the elements before returning.

这是不正确的。

当谈到已经正在处理的元素时,它将等待所有元素完成,因为 Stream API 允许并发处理本质上不是线程安全的数据结构。它必须确保在从终端操作返回之前所有潜在的并发访问已完成。

当谈论整个流时,在 8 核机器上测试仅包含 14 个元素的流是不公平的。当然,至少会启动 8 个并发操作,仅此而已。您使用 findFirst() 而不是 findAny() 来火上浇油,因为这并不意味着按处理顺序返回第一个找到的元素,而是返回第一个元素按遇到顺序排列,即在您的示例中恰好为零,因此处理第一个 block 之外的其他 block 的线程不能假设它们的结果是正确的答案,并且比使用 findAny()< 更愿意帮助处理其他候选者.

当您使用时

List<Integer> nums = IntStream.range(0, 200).boxed().collect(Collectors.toList());
Optional<Integer> num = nums.parallelStream()
        .map(n -> {
            long delay = ThreadLocalRandom.current().nextInt(10_000);
            log("Waiting on " + n + " for " + delay + " ms");
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delay));
            return n * n;
        })
        .filter(n -> n < 40_000)
        .peek(n -> log("Found match: " + n))
        .findAny();

log("First match: " + num);

尽管流元素的数量要多得多,但您将获得相似数量的运行完成任务。

请注意,CompletableFuture 也不支持中断,因此我想到的唯一用于返回任何结果并取消其他作业的内置功能是旧的 ExecutorService.invokeAny .

要为其构建映射和过滤功能,我们可以使用以下辅助函数:

static <T,R> Callable<R> mapAndfilter(T t, Function<T,R> f, Predicate<? super R> p) {
    return () -> {
        R r = f.apply(t);
        if(!p.test(r)) throw new NoSuchElementException();
        return r;
    };
}

不幸的是,只能选择以值或异常完成,因此我们必须对不匹配的元素使用异常。

然后我们就可以这样使用它

ExecutorService es = ForkJoinPool.commonPool();
Integer result = es.invokeAny(IntStream.range(0, 100)
    .mapToObj(i -> mapAndfilter(i,
        n -> {
            long delay = ThreadLocalRandom.current().nextInt(10_000);
            log("Waiting on " + n + " for " + delay + " ms");
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delay));
            return n * n;
        },
        n -> n < 10_000))
    .collect(Collectors.toList()));

log("result: "+result);

它不仅会取消待处理的任务,还会返回而不等待它们完成。

当然,这意味着源数据以及所操作的作业必须是不可变的或线程安全的。

关于java - 一旦发现 findAny 匹配,如何停止并行流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52411986/

相关文章:

Java8 Stream List<Map<String,Object>> 分组依据和计数值

java - 使用 EXACTLY 1 流查找数组中元素对的最大差异

java - jetty worker 。使用基于主机 ID 的许可证运行软件

java - 使位图的触摸区域变为透明

r - 为什么这个 for 循环不能并行工作?

r - 在 foreach R 中使用列表

java - 为什么 Spring Boot Starter Mongodb Reactive 不保存我的实体的列表字段?

java - 我将如何创建对象引用变量数组?

java - 在 Java 中使用线程时维护 FIFO

java - 使用 Java 8 JDK 将 Iterable 转换为 Stream