我试图模仿 java8 流中 Spring Integration
和 Apache Camel
中 WireTap
的行为,其中当前处理数据的副本传递给 WireTap 以便在单独的线程中处理它,这对于日志记录和审核很有帮助
这里我只想peek
中的日志记录想要在单独的线程上运行
List<String> lines = ...
List<String> upperLines = lines.stream()
.map(String::toUpperCase)
.parallel() // which is hidden by the sequential
.peek(line -> System.out.println(line)) // don't want to run this fully on main thread
.sequential()
.collect(Collectors.toList());
我是否需要使用 BlockingQueue
或 ExecutorService
实现一个单独的方法来执行此操作
.peek(this::logger)
最佳答案
没有办法以不同的模式处理流管道的各个部分,并且考虑到提交异步作业对您来说是多么简单,实现这种混合模式管道也不会带来返回:
ExecutorService es = Executors.newFixedThreadPool(4);
List<String> upperLines = lines.stream()
.map(String::toUpperCase)
.peek(line -> es.execute(() -> System.out.println(line)))
.collect(Collectors.toList());
es.shutdown();
或
List<String> upperLines = lines.stream()
.map(String::toUpperCase)
.peek(line -> CompletableFuture.runAsync(() -> System.out.println(line)))
.collect(Collectors.toList());
// when running this in the main method, avoid JVM termination before the async jobs:
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);
但请注意,在这种特定情况下,实际上没有明显的差异,例如
List<String> upperLines = lines.stream()
.map(String::toUpperCase)
.collect(Collectors.toList());
upperLines.parallelStream().forEach(line -> System.out.println(line));
或者,如果您不想等待日志记录语句完成:
List<String> upperLines = lines.stream()
.map(String::toUpperCase)
.collect(Collectors.toList());
upperLines.forEach(line -> CompletableFuture.runAsync(() -> System.out.println(line)));
关于java - 如何在 Java-8 中仅并行运行某些中间操作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48139680/