java - 为什么 RxJava 在异步处理时只使用约 10 个线程?

标签 java asynchronous rx-java

考虑下面的代码,我试图让 Observables 异步运行。

    try {
        DateTime now = DateTime.now();
        Observable
                .from(map.entrySet()).subscribeOn(Schedulers.from(new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 10)))
                .flatMap(Async.toAsync((Map.Entry<String, Info> entry) -> {
                    // processing work, makes multiple http requests for ref data
                }))
                .doOnCompleted(() -> System.out.println("completed yo...."))
                .doOnError(Throwable::printStackTrace)
                .toList()
                .timeout(1, TimeUnit.MINUTES)
                .toBlocking()
                .single()
                ;

        logger.info(now.toString());
        logger.info(DateTime.now().toString());

        saveToFile(gson.toJson(setForRx));
    } catch (Exception e) {
        e.printStackTrace();
    }

输出显示它使用相同的约 10 个线程进行处理,我如何增加它?

示例输出:

INFO  2015-06-29 15:11:20,524 [rxjava.ConcurrentRxJava] RxComputationThreadPool-3 
INFO  2015-06-29 15:11:20,526 [rxjava.ConcurrentRxJava] RxComputationThreadPool-6 
INFO  2015-06-29 15:11:20,542 [rxjava.ConcurrentRxJava] RxComputationThreadPool-4 
INFO  2015-06-29 15:11:20,546 [rxjava.ConcurrentRxJava] RxComputationThreadPool-7 
INFO  2015-06-29 15:11:20,571 [rxjava.ConcurrentRxJava] RxComputationThreadPool-2
INFO  2015-06-29 15:11:20,694 [rxjava.ConcurrentRxJava] RxComputationThreadPool-1 
INFO  2015-06-29 15:11:20,920 [rxjava.ConcurrentRxJava] RxComputationThreadPool-8
INFO  2015-06-29 15:11:21,035 [rxjava.ConcurrentRxJava] RxComputationThreadPool-7 
INFO  2015-06-29 15:11:21,039 [rxjava.ConcurrentRxJava] RxComputationThreadPool-4 
INFO  2015-06-29 15:11:21,055 [rxjava.ConcurrentRxJava] RxComputationThreadPool-5
INFO  2015-06-29 15:11:21,081 [rxjava.ConcurrentRxJava] RxComputationThreadPool-3 
INFO  2015-06-29 15:11:21,094 [rxjava.ConcurrentRxJava] RxComputationThreadPool-6 
INFO  2015-06-29 15:11:21,118 [rxjava.ConcurrentRxJava] RxComputationThreadPool-2 

在我的执行器版本中,使用 Runtime.getRuntime().availableProcessors() * 10 ,我得到 80 号泳池。 RxJava 可以做到这一点吗?

最佳答案

toAsync()默认情况下,在 computation() 上运行具有固定数量线程的调度程序。有一个需要调度程序的重载,因此您应该重构 Schedulers.from(...)到局部变量并将该变量传递给 toAsync() .

关于java - 为什么 RxJava 在异步处理时只使用约 10 个线程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31123552/

相关文章:

java - Gradle:如何在 Gradle 插件中导出项目目录路径?

c - 如何在运行异步串行通信时等待响应?

javascript - Cloudsponge 异步 javascript

android - 我是否正确使用 flatMap 来合并来自多个 API 调用的结果?

java - 如何使用sqlite数据库在android中进行登录屏幕?

java - 在代码库中查找 Java Reflection 的用法

scala - 使用带有 onComplete 的 Scala future 列表进行异步处理以进行异常处理

android - RxJava 与 SQlite 和 ContentProvider 操作

asynchronous - rxjava 在创建 observable 后添加项目

java - 在运行 JUnit 测试时测量内存消耗的方法