java - 并发执行 : Future vs parallelstream

标签 java performance parallel-processing java-stream future

我编写了一个可调用程序,用于轮询远程客户端以获取信息并以列表形式返回该信息。我正在使用线程池执行器、for 循环和 Future 来针对多个远程客户端并行执行任务。然后,我将所有 Future 列表与 addAll() 结合起来,并使用巨大的组合列表。

我的问题是,在这里使用 parallelstream() 会比使用 future 和 for 循环更有效吗?编码当然更容易!如果我走那条路,我还需要线程池执行器吗?

谢谢!

        for(SiteInfo site : active_sites) {
            TAG_SCANNER scanr = new TAG_SCANNER(site, loggr);
            Future<List<TagInfo>> result = threadmaker.submit(scanr);

            //SOUND THE ALARMS
            try {
                alarm_tags.addAll(result.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }

可能的解决方案代码? Netbeans 提出了类似的建议

active_sites.parallelstream().map((site) -> new TAG_SCANNER(site, loggr)).map((scanr) -> threadmaker.submit(scanr)).forEach((result) -> {
            //SOUND THE ALARMS
            try {
                alarm_tags.addAll(result.get());
            }
            catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });

最佳答案

这里有几个误解。首先,如果调用 Future.get,使用异步任务不会提高资源利用率。提交任务后,立即等待任务完成再提交下一个任务。

其次,Netbeans 进行的代码转换产生了一个基本相同的代码,仍然将任务提交给 Executor所以这不是“Future vs parallelstream”的问题,因为你只是用并行流执行提交(和等待)并且仍然使用执行器。由于您的第一个错误,并行执行它可能会提高吞吐量,但除此之外,将两个错误结合起来让它们相互抵消绝不是一个好主意,这仍然是一个糟糕的解决方案:

Stream API 的标准实现针对 CPU 密集型任务进行了优化,创建与 CPU 核心数量相匹配的线程数量,并且当这些线程在等待操作中被阻塞时不会产生新线程。因此,使用并行流来执行 I/O 操作,或者通常可能等待的操作,并不是一个好的选择。而且您无法控制实现所使用的线程。

更好的选择是留在 ExecutorService您可以根据您对远程客户端的预期 I/O 带宽进行配置。但是你应该修复提交后立即等待的错误,先提交所有任务,然后等待所有任务完成。请注意,您可以为此使用流 API,但不是为了更好的并行性,而是可能提高可读性:

// first, submit all tasks, assuming "threadmaker" is an ExecutorService
List<Future<List<TagInfo>>> futures=threadmaker.invokeAll(
    active_sites.stream()
        .map(site -> new TAG_SCANNER(site, loggr))
        .collect(Collectors.toList())
);
// now fetch all results
for(Future<List<TagInfo>> result: futures) {
    //SOUND THE ALARMS
    try {
        alarm_tags.addAll(result.get());
    } catch (InterruptedException | ExecutionException e) {
        // not a recommended way of handling
        // but I keep your code here for simplicity
        e.printStackTrace();
    }
}

请注意,此处使用的流 API 是顺序并且仅用于转换您的 SiteInfo 列表到 Callable<List<TagInfo>> 的列表,但您可以使用循环执行相同的操作。

关于java - 并发执行 : Future vs parallelstream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36456032/

相关文章:

parallel-processing - 我可以在 Circle CI 中循环遍历过滤器值列表,并使用每个过滤器值开始一项作业吗?

concurrency - 哪些挑战促进了并行/并发架构的使用?

java - 同时线程运行

Java EE应用程序不像jsp那样打印JSP文件,而是像HTML一样打印

c++ - 堆数组性能慢

c++ - 共享内存和性能

java - 使用 Jackson (ObjectMapper) 如何将对象序列化为 json 并忽略除我注释为 @JsonProperty 的字段之外的所有字段?

java - KafkaConsumer 在 KafkaServer 上出错(版本 0.9.0.1)

sql - 使用时间戳 OVERLAPS 和 "PARTITION BY"加速 PostgreSQL 查询

c# - CPU 绑定(bind)任务的并行化继续与 IO 绑定(bind)