multithreading - 与仅使用多个流相比, fork 一个流有什么优势?

标签 multithreading java-8 java-stream completable-future blockingqueue

我正在阅读 java 8 in action,作者引用了此链接:http://mail.openjdk.java.net/pipermail/lambda-dev/2013-November/011516.html

并编写了自己的流分支,如下所示:

import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class Main {

    public static void main(String... args) {
        List<Person> people = Arrays.asList(new Person(23, "Paul"), new Person(24, "Nastya"), new Person(30, "Unknown"));
        StreamForker<Person> forker = new StreamForker<>(people.stream())
                .fork("All names", s -> s.map(Person::getName).collect(Collectors.joining(", ")))
                .fork("Age stats", s -> s.collect(Collectors.summarizingInt(Person::getAge)))
                .fork("Oldest", s -> s.reduce((p1, p2) -> p1.getAge() > p2.getAge() ? p1 : p2).get());
        Results results = forker.getResults();

        String allNames = results.get("All names");
        IntSummaryStatistics stats = results.get("Age stats");
        Person oldest = results.get("Oldest");

        System.out.println(allNames);
        System.out.println(stats);
        System.out.println(oldest);
    }

    interface Results {
        <R> R get(Object key);
    }

    static class StreamForker<T> {
        private final Stream<T> stream;
        private final Map<Object, Function<Stream<T>, ?>> forks = new HashMap<>();

        public StreamForker(Stream<T> stream) {
            this.stream = stream;
        }

        public StreamForker<T> fork(Object key, Function<Stream<T>, ?> f) {
            forks.put(key, f);
            return this;
        }

        public Results getResults() {
            ForkingStreamConsumer<T> consumer = build();
            try {
                stream.sequential().forEach(consumer);
            } finally {
                consumer.finish();
            }
            return consumer;
        }

        private ForkingStreamConsumer<T> build() {
            List<BlockingQueue<T>> queues = new ArrayList<>();

            Map<Object, Future<?>> actions =
                    forks.entrySet().stream().reduce(
                            new HashMap<>(),
                            (map, e) -> {
                                map.put(e.getKey(),
                                        getOperationResult(queues, e.getValue()));
                                return map;
                            },
                            (m1, m2) -> {
                                m1.putAll(m2);
                                return m1;
                            }
                    );
            return new ForkingStreamConsumer<>(queues, actions);
        }

        private Future<?> getOperationResult(List<BlockingQueue<T>> queues,
                                             Function<Stream<T>, ?> f) {
            BlockingQueue<T> queue = new LinkedBlockingQueue<>();
            queues.add(queue);
            Spliterator<T> spliterator = new BlockingQueueSpliterator<>(queue);
            Stream<T> source = StreamSupport.stream(spliterator, false);
            return CompletableFuture.supplyAsync(() -> f.apply(source));
        }
    }

    static class ForkingStreamConsumer<T> implements Results, Consumer<T> {
        static final Object END_OF_STREAM = new Object();
        private final List<BlockingQueue<T>> queues;
        private final Map<Object, Future<?>> actions;

        ForkingStreamConsumer(List<BlockingQueue<T>> queues,
                              Map<Object, Future<?>> actions) {
            this.queues = queues;
            this.actions = actions;
        }

        public void finish() {
            accept((T) END_OF_STREAM);
        }

        @Override
        public <R> R get(Object key) {
            try {
                return ((Future<R>) actions.get(key)).get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void accept(T t) {
            queues.forEach(q -> q.add(t));
        }
    }

    static class BlockingQueueSpliterator<T> implements Spliterator<T> {

        private final BlockingQueue<T> q;

        public BlockingQueueSpliterator(BlockingQueue<T> q) {
            this.q = q;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            T t;
            while (true) {
                try {
                    t = q.take();
                    break;
                } catch (InterruptedException e) {
                }
            }

            if (t != ForkingStreamConsumer.END_OF_STREAM) {
                action.accept(t);
                return true;
            }
            return false;
        }

        @Override
        public Spliterator<T> trySplit() {
            return null;
        }

        @Override
        public long estimateSize() {
            return 0;
        }

        @Override
        public int characteristics() {
            return 0;
        }
    }

    static class Person {
        private int age;
        private String name;

        public Person(int age, String name) {
            this.age = age;
            this.name = name;
        }

        public int getAge() {
            return age;
        }

        public String getName() {
            return name;
        }

        @Override
        public String toString() {
            return String.format("Age: %d, name: %s", age, name);
        }
    }
}

作者编写的代码是如何工作的:

首先,我们从流中创建一个 StreamForker。然后我们 fork 3 个操作,说明我们想要在该流上并行执行什么操作。在我们的例子中,我们的数据模型是 Person{age, name} 类,我们想要执行 3 个操作:

  • 获取所有名称的字符串
  • 获取年龄统计数据
  • 找到最年长的人

然后我们调用 forker.getResults() 方法,该方法将 StreamForkerConsumer 应用于流,将其元素分散到 3 个阻塞队列中,然后将其转换为 3 个流并并行处理。

我的问题是,这种方法比这样做有什么优势:

Future<String> allNames2 =
                CompletableFuture.supplyAsync(() -> people.stream().map(Person::getName).collect(Collectors.joining(", ")));
Future<IntSummaryStatistics> stats2 =
                CompletableFuture.supplyAsync(() -> people.stream().collect(Collectors.summarizingInt(Person::getAge)));
Future<Person> oldest2 =
                CompletableFuture.supplyAsync(() -> people.stream().reduce((p1, p2) -> p1.getAge() > p2.getAge() ? p1 : p2).get());

最佳答案

对我来说,使用数组列表作为流源没有多大意义。

如果流源是您处理的大文件

StreamForker<Person> forker = new StreamForker<>(
    java.nio.file.Files.lines(Paths.get("somepath"))
        .map(Person::new))
    .fork(...)

那么它可能会被证明是有益的,因为您只需处理整个文件一次,而对 Files.lines(...) 进行三次单独调用,您将读取该文件三次。

关于multithreading - 与仅使用多个流相比, fork 一个流有什么优势?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50896974/

相关文章:

java-8 - Java 8 与 Glassfish 2.1.1 可能吗?

java - Java Stream中执行Reduce操作出现异常

Java 内存模型和并发

python - 打开 .exe 并通过 Python 子进程向其传递命令?

java - 为什么 Java 8 ZonedDateTime 认为 24​​ :01 is a valid time string representation?

Java,映射一个列表并在一行中进行包含检查还是遍历整个列表?

java-8 - 使用Java 8流修改列表中对象的属性值

java - java处理大文件

java - 多线程|| java

multithreading - Swift 是否有 Java ThreadPoolExecutor 的类似物?