java - 如何在 Java-8 中仅并行运行某些中间操作?

标签 java multithreading java-8

我试图模仿 java8 流中 Spring IntegrationApache CamelWireTap 的行为,其中当前处理数据的副本传递给 WireTap 以便在单独的线程中处理它,这对于日志记录和审核很有帮助

这里我只想peek中的日志记录想要在单独的线程上运行

List<String> lines = ...

List<String> upperLines = lines.stream()
    .map(String::toUpperCase)
    .parallel() // which is hidden by the sequential
    .peek(line -> System.out.println(line)) // don't want to run this fully on main thread
    .sequential()
    .collect(Collectors.toList());

我是否需要使用 BlockingQueueExecutorService 实现一个单独的方法来执行此操作

.peek(this::logger)

最佳答案

没有办法以不同的模式处理流管道的各个部分,并且考虑到提交异步作业对您来说是多么简单,实现这种混合模式管道也不会带来返回:

ExecutorService es = Executors.newFixedThreadPool(4);
List<String> upperLines = lines.stream()
    .map(String::toUpperCase)
    .peek(line -> es.execute(() -> System.out.println(line)))
    .collect(Collectors.toList());
es.shutdown();

List<String> upperLines = lines.stream()
    .map(String::toUpperCase)
    .peek(line -> CompletableFuture.runAsync(() -> System.out.println(line)))
    .collect(Collectors.toList());
// when running this in the main method, avoid JVM termination before the async jobs:
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);

但请注意,在这种特定情况下,实际上没有明显的差异,例如

List<String> upperLines = lines.stream()
    .map(String::toUpperCase)
    .collect(Collectors.toList());
upperLines.parallelStream().forEach(line -> System.out.println(line));

或者,如果您不想等待日志记录语句完成:

List<String> upperLines = lines.stream()
    .map(String::toUpperCase)
    .collect(Collectors.toList());
upperLines.forEach(line -> CompletableFuture.runAsync(() -> System.out.println(line)));

关于java - 如何在 Java-8 中仅并行运行某些中间操作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48139680/

相关文章:

java - Dagger 2 : Field Injection which also has injection field in it

java - 英语语言计算器 [java]

java - 无法将 ISO 8601 格式的字符串解析为 Java 8 日期,在偏移量中缺少冒号

java - Java FX 中的鼠标滚动

java - 为什么我的日期时间解析尝试失败?

C# MVVM 模式在无限循环线程中更新 UI?

php - Linux 的 cron 是异步的还是同步的?

java - 如何从后台线程调用 Java 类

java - CompletableFuture.thenAccept 中使用的垃圾回收对象的可用性

java-8 - 如何使用流将一维整数数组转换为映射