java - 如何使用流并行和过滤器以及findFirst短路

标签 java filter parallel-processing stream findfirst

我有一个具有不同复杂性(处理持续时间)的 TxnType 列表。
我想从列表中找到匹配的 TxnType
我尝试通过混合流的并行处理和短路过滤功能来实现它。
但我注意到它们没有混合。
我写了下面的示例。但注意到并联和短路的混合无法正常工作。
每次运行都显示并行处理正在工作,但一旦找到项目就不会终止!!!

    class TxnType {
        public String id;   
        public TxnType(String id) {this.id = id;}
       
        public boolean match(String entry) {
            Date s = new Date();
            // simulate long processing match time TxnType
            if (id.equals("1")) {
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Date f = new Date();
            System.out.println("check id = " + id+ "  duration = "+(f.getTime()- s.getTime()));
    
            return id.equalsIgnoreCase(entry);
        }
    }

     private void test4() {
        // build list of available TxnTypes
        ArrayList<TxnType> lst = new ArrayList<>();
        lst.add(new TxnType("0"));
        lst.add(new TxnType("1"));  // long match processing time type
        lst.add(new TxnType("2"));
        lst.add(new TxnType("3"));
        lst.add(new TxnType("4"));

        String searchFor = "3";
        System.out.println("searchFor = " + searchFor);
        Date st, fi;

        st = new Date();
        Optional<TxnType> found2 =lst.stream().parallel().filter(txnType->txnType.match(searchFor)).findFirst();
            System.out.println("found.stream().count() = " + found2.stream().count());
            fi= new Date();
            System.out.println("dur="+ (fi.getTime()- st.getTime()));
    }

通过多次运行,发现没有尽快终止处理,等待全部处理完!!!

searchFor = 3
check id = 4  duration = 0
check id = 2  duration = 0
check id = 3  duration = 0
check id = 0  duration = 0
check id = 1  duration = 4005
found.stream().count() = 1
dur=4050

有类似 FilterFindFirst() 的东西吗?

最佳答案

您的错误是使用findFirst,而不是findAny

请注意,1 排在您希望找到的元素 (3) 之前。因此,它必须先完成对 1 的检查,然后才能得出“3 是与谓词匹配的第一个元素”的结论,即使它们是并行完成。如果它找到了3,并且还没有开始检查列表中的其他内容,那么它就不会开始检查。这就是 findFirst 中短路的含义。

另一方面,

findAny 不关心顺序。如果它发现任何满足谓词的元素,它就不会再开始检查任何新内容。

现在,即使您更改为 findAny,您可能仍然会发现需要 4 秒才能完成。这是因为与流管道可以创建的线程数量相比,列表中的元素太少。这样所有元素的处理就开始了,一旦开始就不会被中断,即使它已经找到了满足谓词的元素。

如果将更多元素放入列表中:

for (int i = 0 ; i < 100 ; i++) {
    lst.add(new TxnType("foo"));
}

...

Optional<TxnType> found2 = lst.parallelStream().filter(txnType -> txnType.match(searchFor)).findAny();

那么在 3 处理完成之前就不太可能开始处理 1,并且您将获得更快的运行速度。但这不会每次都会发生。无法保证 1 不会在 3 之前得到处理。

基本上,短路工作正常。就是这样

  • findFirst 不会像您希望的那样频繁短路
  • 您的列表中的元素太少,而您的计算机有足够的内核来一次处理所有这些元素,所以确实如此。

关于java - 如何使用流并行和过滤器以及findFirst短路,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68140233/

相关文章:

java - 为什么没有 OptionalDouble orElseNull() 或更好的解决方案

c# - 你会如何简单地 Monitor.TryEnter

java - Android - 错误膨胀类 fragment - XML 文件

filter - 使用 ffmpeg 在同一图像上使用两次淡入/淡出

powershell - 什么是文件系统提供程序过滤器语法?

angularjs - ng-options 和唯一过滤器不显示 angular.js

python - 并行: Import a python file from sibling folder

r - 了解 R 中 mclapply 和 parLapply 之间的区别

Java:它一直说 "variable yn might not have been initialized"

java - 基于 vector 和点的四元数旋转