java - 如何在不立即订阅的情况下调用RxJava Observable?

标签 java multithreading rx-java

我有一个返回字符串模板的java方法。我想对远程 api 进行 2 个异步调用,每个调用将返回一个数字,然后我想计算这 2 个数字的总和并将其放入模板中,然后返回。

所以我有这个java代码来完成这个任务:

private Observable<Integer> createObservable() {
    Observable<Integer> obs = Observable.create(new OnSubscribe<Integer>() {

        public void call(Subscriber<? super Integer> t) {
            System.out.println("Call with thread : " + Thread.currentThread().getName());
            //FAKE CALL TO REMOTE API => THE THREAD IS SLEEPING DURING 4 SECCONDS
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            t.onNext(new Random().nextInt(10));
            t.onCompleted();
        }
    }).subscribeOn(Schedulers.newThread());
    
    return Observable
        .merge(obs, obs)
        .reduce(new Func2<Integer, Integer, Integer>() {
            
            public Integer call(Integer t1, Integer t2) {
                return t1 + t2;
            }
        });     
}

public String retrieveTemplate() {
    //I WANT TO START THE WORK OF THE OBSERVABLE HERE BUT I DON'T KNOW HOW TO DO IT
    
    //DO THINGS IN THE MAIN THREAD
    //HERE I JUST INITIALIZE A STRING BUT WE COULD IMAGINE I WOULD DO MORE THINGS
    String s = "The final Number is {0}";
    System.out.println(Thread.currentThread().getName() + " : the string is initialized");
    
    //I WAIT FOR THE OBSERVABLE RESULT HERE
    int result = createObservable().toBlocking().first();
    
    return MessageFormat.format(s, result);
}

这段代码的输出是正确的(创建了两个线程来调用远程api)

main : the string is initialized

Call with thread : RxNewThreadScheduler-1

Call with thread : RxNewThreadScheduler-2

The final Number is 2

我想在方法retrieveTemplate的开头调用RxJava Observable(以便尽快调用远程api)并在调用之前等待结果MessageFormat.format但我不知道该怎么做

最佳答案

假设整个创建过程有效,您可能希望通过转换源可观察值将整个计算绑定(bind)到订阅时刻:

public Observable<String> retrieveTemplate() {
    return createObservable().map(result -> {
        String s = "The final Number is {0}";
        System.out.println(Thread.currentThread().getName() + " : the string is initialized");

        return MessageFormat.format(s, result);
    });
}

当您订阅 retrieveTemplate 的可观察结果时 - 您实际上开始了整个计算:

// some other place in the code
retrieveTemplate().subscribe(template -> doStuffWithTemplate(template))

关于java - 如何在不立即订阅的情况下调用RxJava Observable?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33785052/

相关文章:

java - 集成 Spring MVC 4 和 Hibernate 5

Java:将内存刷新到磁盘

java - 分布式环境中的ExecutorCompletionService.take()方法

C++ 多线程和 vector

android - 将 AsyncTask 转换为 RxAndroid

kotlin - 可观察到在没有订阅者时缓冲项目,然后在订阅时发出它们并清除缓冲区?

java - SmartGWT 中的多列网格过滤器

multithreading - 不变的数据结构如何不是线程安全的?

java - 如何在 RxJava 2 中使用 Flowable?

java - 列的值太长 - 错误持续存在 LocalDate