java - RxJava : observable that contains an asynchronous call

标签 java asynchronous reactive-programming rx-java

我试图理解 RxJava 并遇到以下情况。

考虑以下返回调用 NsdManager.registerService 的可观察对象的方法。 registerService 方法需要一个监听器,当注册成功(或失败)时调用。

public Observable<Boolean> registerService() {
    return Observable.create(new Observable.OnSubscribe<Boolean>() {

        @Override
        public void call(Subscriber<? super Boolean> subscriber) {
            nsdManager.registerService(serviceInfo, NsdManager.PROTOCOL_DNS_SD, registrationListener);

            // how to proceed?
        }
    });
}

观察者只有在监听器被调用后才能提供通知,但监听器是异步调用的。

我如何使用 RxJava 做到这一点?


我使用 BehaviorSubject 想出了以下内容。不知道这是否是最佳解决方案,但它确实有效。

private BehaviorSubject<Boolean> registrationSubject;

public Observable<Boolean> registerService() {
    registrationSubject = BehaviorSubject.create();

    Observable.create(new Observable.OnSubscribe<Boolean>() {
        @Override
        public void call(Subscriber<? super Boolean> subscriber) {
            NsdServiceInfo serviceInfo  = new NsdServiceInfo();
            serviceInfo.setServiceName(serviceName);
            serviceInfo.setServiceType(NSD_SERVICE_TYPE);
            serviceInfo.setPort(serverSocket.getLocalPort());

            nsdManager.registerService(serviceInfo, NsdManager.PROTOCOL_DNS_SD, registrationListener);
        }
    }).subscribe(registrationSubject);

    return registrationSubject;
}

private NsdManager.RegistrationListener registrationListener = new NsdManager.RegistrationListener() {
    @Override
    public void onRegistrationFailed(NsdServiceInfo serviceInfo, int errorCode) {
        registrationSubject.onNext(false);
        registrationSubject.onCompleted();
    }

    @Override
    public void onServiceRegistered(NsdServiceInfo serviceInfo) {
        registrationSubject.onNext(true);
        registrationSubject.onCompleted();
    }

    @Override
    public void onUnregistrationFailed(NsdServiceInfo serviceInfo, int errorCode) { }

    @Override
    public void onServiceUnregistered(NsdServiceInfo serviceInfo) {}
};

最佳答案

我认为最好尽可能避免使用 Subjects。 在您的解决方案中,您仅使用主题来调用 onNextonCompleted。但是,在 Observable.create() 方法中,您已经可以访问可以调用这些方法的 subscriber。换句话说,您可以将事件处理程序的完整设置包装在 Observable.create() 方法中。

public Observable<Boolean> registerService() {
    return Observable.create(new Observable.OnSubscribe<Boolean>() {
        @Override
        public void call(final Subscriber<? super Boolean> subscriber) {
            NsdServiceInfo serviceInfo  = new NsdServiceInfo();
            serviceInfo.setServiceName(serviceName);
            serviceInfo.setServiceType(NSD_SERVICE_TYPE);
            serviceInfo.setPort(serverSocket.getLocalPort());

            nsdManager.registerService(serviceInfo, NsdManager.PROTOCOL_DNS_SD, 
                new NsdManager.RegistrationListener() {
                    @Override
                    public void onRegistrationFailed(NsdServiceInfo serviceInfo, int errorCode) {
                        if (!subscriber.isUnsubscribed()) {
                            subscriber.onNext(false);
                            subscriber.onCompleted();
                        }
                    }

                    @Override
                    public void onServiceRegistered(NsdServiceInfo serviceInfo) {
                        if (!subscriber.isUnsubscribed()) {
                            subscriber.onNext(true);
                            subscriber.onCompleted();
                        }
                    }

                    @Override
                    public void onUnregistrationFailed(NsdServiceInfo serviceInfo, int errorCode) { }

                    @Override
                    public void onServiceUnregistered(NsdServiceInfo serviceInfo) {}
                }
            );
        }
    });
}

关于java - RxJava : observable that contains an asynchronous call,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25259901/

相关文章:

Perl:基于事件和基于并行的风格 - 何时选择一种?

java - 如何在 RxJava 中将单个的输出传递给可完成的?

javascript - WebSocket - Javascript 客户端表示已连接,但 Java 服务器没有

javascript - 正确终止 NodeJS Async Await

angular - 我认为变量不会反射(reflect)可观察值的值

c# - 在前一个元素匹配条件后获取流的第一个元素

javascript - 带有 redux 的 RxJS - 完成时发出 Action (使用 mergeMap)

java - 如何在 AWS 中创建新用户

java - PosixFileFunctions.stat在Linux for Gradle 2.3中失败并显示UnsatisfiedLinkError

java - 传递一个类参数并返回该类的对象(java)