java - RxJava 的 zip 操作符问题

标签 java rx-java

我面临着 zip 操作的问题,我有 3 个 Observables 与 zip 操作符结合在一起。问题是有时订阅代码中的语句没有被执行。 zip 操作符是否不应该等待所有可观察的事件发出事件。下面是示例代码。

import java.util.Date;

import rx.Observable;
import rx.schedulers.Schedulers;
public class ZipRxJava {

    public static void main(String[] args) {
        ZipRxJava z = new ZipRxJava();
        Observable<CartPlanResponse> o1 = Observable.<CartPlanResponse>create(sub -> sub.onNext(createPlanResponse(z))).subscribeOn(Schedulers.io());
        Observable<CartFeatureResponse> o2 = Observable.<CartFeatureResponse>create(sub -> sub.onNext(createFeatureResponse(z))).subscribeOn(Schedulers.io());
        Observable<CartAccessoriesResponse> o3 = Observable.<CartAccessoriesResponse>create(sub -> sub.onNext(createAccessoriesResponse(z))).subscribeOn(Schedulers.io());
        Observable.zip(o1, o2, o3, (p1, p2, p3) -> {
            System.out.println("Inside Transformer $$$$$$$$$$$››››" + Thread.currentThread().getName());
            Response res = z.new Response();
            res.setPlanResponse(p1);
            res.setFeatureResponse(p2);
            res.setAccesoriesResponse(p3);
            return res;
        }).subscribe(r1 -> System.out.println("&&&&&&&&&&&"+ Thread.currentThread().getName() + "*******" +  r1.getPlanResponse().getPlanId() + " " + r1.getFeatureResponse().getFeatureId() + " " +
            r1.getAccesoriesResponse().getAccessoryId()), e1 -> System.out.println("Error"));
        System.out.println("Main Method ********** " + Thread.currentThread().getName());   
    }


    private static CartPlanResponse createPlanResponse(ZipRxJava z) {
        System.out.println("Plan ********** " + Thread.currentThread().getName());  
        CartPlanResponse res = z.new CartPlanResponse();
        res.setPlanId("123");
        System.out.println("Before Return Plan ********** " + Thread.currentThread().getName());
        return res;
    }

    private static CartFeatureResponse createFeatureResponse(ZipRxJava z) {
        System.out.println("Feature ********** " + Thread.currentThread().getName());
        //sleep();
        int y =0;
        for (int i =0 ; i <100000000; i++) {
            y +=i;
        }
        CartFeatureResponse res = z.new CartFeatureResponse();
        res.setFeatureId("345");
        System.out.println("Before Return Feature ********** " + Thread.currentThread().getName());
        return res;
    }

    private static CartAccessoriesResponse createAccessoriesResponse(ZipRxJava z) {
        System.out.println("Accessories ********** " + Thread.currentThread().getName());
        CartAccessoriesResponse res = z.new CartAccessoriesResponse();
        res.setAccessoryId("567");
        System.out.println("Before Return Accessories ********** " + Thread.currentThread().getName());
        return res;
    }

    private static void sleep() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    private class CartPlanResponse {
        String planId;

        public String getPlanId() {
            return planId;
        }

        public void setPlanId(String planId) {
            this.planId = planId;
        }   
    }

    private class CartFeatureResponse {
        private String featureId;

        public String getFeatureId() {
            return featureId;
        }

        public void setFeatureId(String featureId) {
            this.featureId = featureId;
        }
    }

    private class CartAccessoriesResponse {
        private String accessoryId;

        public String getAccessoryId() {
            return accessoryId;
        }

        public void setAccessoryId(String accessoryId) {
            this.accessoryId = accessoryId;
        }
    }

    private class Response {
        private CartPlanResponse planResponse;
        private CartFeatureResponse featureResponse;
        private CartAccessoriesResponse accesoriesResponse;
        public CartPlanResponse getPlanResponse() {
            return planResponse;
        }
        public void setPlanResponse(CartPlanResponse planResponse) {
            this.planResponse = planResponse;
        }
        public CartFeatureResponse getFeatureResponse() {
            return featureResponse;
        }
        public void setFeatureResponse(CartFeatureResponse featureResponse) {
            this.featureResponse = featureResponse;
        }
        public CartAccessoriesResponse getAccesoriesResponse() {
            return accesoriesResponse;
        }
        public void setAccesoriesResponse(CartAccessoriesResponse accesoriesResponse) {
            this.accesoriesResponse = accesoriesResponse;
        }

    }

最佳答案

您必须在主方法中 hibernate ,因为 RxJava 调度程序在守护线程上运行,如果主线程在您的情况下退出,守护线程将停止并且不执行工作:

    System.out.println("Main Method ********** " + Thread.currentThread().getName());
    sleep();

打印:

Plan ********** RxIoScheduler-2
Before Return Plan ********** RxIoScheduler-2
Feature ********** RxIoScheduler-3
Main Method ********** main
Accessories ********** RxIoScheduler-4
Before Return Accessories ********** RxIoScheduler-4
Before Return Feature ********** RxIoScheduler-3
Inside Transformer $$$$$$$$$$$››››RxIoScheduler-3
&&&&&&&&&&&RxIoScheduler-3*******123 345 567

关于java - RxJava 的 zip 操作符问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43083840/

相关文章:

java - Netbeans 和 DOS 风格的进度条

java - 用括号解析负数

java - 为什么 Clojure 中的嵌套循环/递归速度很慢?

java - 想要从 Gradle fat jar 中删除特定的 jar

rx-java - subject.asObservable 有什么用处?

android - RxJava 缓存网络调用

java - 使用 Retrofit、RxJava 和 Observable.share 进行重新身份验证?

java - 设置toggleButton不可点击

android - RxAndroid : Manage events of interdependent objects

rx-java - RxBinding 文档