java - FutureCallback 到 Observable

标签 java callback rx-java observable

有没有办法从 FutureCallback 创建 Observable ?我发现 Observable.create 已被弃用,并且不是正确的方法(如下)。

我正在转换

Observable.create(new Observable.OnSubscribe<HttpResponse>() {
        @Override
        public void call(Subscriber<? super HttpResponse> subscriber) {
            getClient().execute(httpRequest, new FutureCallback<HttpResponse>() {
                @Override
                public void completed(HttpResponse response) {
                    subscriber.onNext(response);
                    subscriber.onCompleted();
                }
                @Override
                public void failed(Exception ex) {
                    subscriber.onError(ex);
                }
                @Override
                public void cancelled() {
                    subscriber.onError(new Exception());
                }
            });
        }
})

最佳答案

使用其他创建 overload :

Observable.<Event>create(emitter -> {
    Callback listener = new Callback() {
        @Override
        public void onEvent(Event e) {
            emitter.onNext(e);
            if (e.isLast()) {
                emitter.onCompleted();
            }
        }

        @Override
        public void onFailure(Exception e) {
            emitter.onError(e);
        }
    };

    AutoCloseable c = api.someMethod(listener);

    emitter.setCancellation(c::close);

}, BackpressureMode.BUFFER);

关于java - FutureCallback 到 Observable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44601854/

相关文章:

android - 如何使用 RxJava 处理 Retrofit 2 中的网络错误

java - 为什么 Completable 和 Observable 之间的订阅时间副作用安排不同?

java - 如何对集合子集应用进一步的过滤器

java - 如何从媒体播放器获取剩余歌曲和总持续时间?

html - Paypal 回调未验证

javascript - 全局变量未使用nodejs更新

java - 为什么 Android Activity 中的方法回调必须是静态的?

java - 使用java根据时间戳计算每秒事务数

java - 在 junit 中测试当前的 java 时间戳

android - RxJava - 捕捉消费者异常