java - RxJava : How to Subscribe only if the Observable is Cold?

标签 java rx-java subscription observable subscriber

TL;DR:如何创建一个仅在时创建订阅并在时对任何其他订阅调用进行排队的 Observable?

我想创建一个一次只能执行一个订阅的 Observable。如果有任何其他订阅者订阅了 Observable,我希望他们在 Observable 完成时(在 onComplete 之后)排队运行。

我可以通过拥有某种堆栈并在每次 onComplete 时弹出堆栈来自己构建这个结构——但感觉这个功能已经存在于 RxJava 中。

有没有办法以这种方式限制订阅?

(More on hot and cold observables)

最佳答案

我认为没有内置运算符或运算符组合可以实现此目的。以下是我将如何实现它:

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.*;

import rx.*;
import rx.observers.TestSubscriber;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;

public final class SequenceSubscribers<T> implements Observable.OnSubscribe<T> {

    final Observable<? extends T> source;

    final Queue<Subscriber<? super T>> queue;

    final AtomicInteger wip;

    volatile boolean active;

    public SequenceSubscribers(Observable<? extends T> source) {
        this.source = source;
        this.queue = new ConcurrentLinkedQueue<>();
        this.wip = new AtomicInteger();
    }

    @Override
    public void call(Subscriber<? super T> t) {
        SubscriberWrapper wrapper = new SubscriberWrapper(t);
        queue.add(wrapper);

        t.add(wrapper);
        t.add(Subscriptions.create(() -> wrapper.next()));

        drain();
    }

    void complete(SubscriberWrapper inner) {
        active = false;
        drain();
    }

    void drain() {
        if (wip.getAndIncrement() != 0) {
            return;
        }
        do {
            if (!active) {
                Subscriber<? super T> s = queue.poll();
                if (s != null && !s.isUnsubscribed()) {
                    active = true;
                    source.subscribe(s);
                }
            }
        } while (wip.decrementAndGet() != 0);
    }

    final class SubscriberWrapper extends Subscriber<T> {
        final Subscriber<? super T> actual;

        final AtomicBoolean once;

        public SubscriberWrapper(Subscriber<? super T> actual) {
            this.actual = actual;
            this.once = new AtomicBoolean();
        }

        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }

        @Override
        public void onError(Throwable e) {
            actual.onError(e);
            next();
        }

        @Override
        public void onCompleted() {
            actual.onCompleted();
            next();
        }

        @Override
        public void setProducer(Producer p) {
            actual.setProducer(p);
        }

        void next() {
            if (once.compareAndSet(false, true)) {
                complete(this);
            }
        }
    }

    public static void main(String[] args) {
        PublishSubject<Integer> ps = PublishSubject.create();

        TestSubscriber<Integer> ts1 = TestSubscriber.create();
        TestSubscriber<Integer> ts2 = TestSubscriber.create();

        Observable<Integer> source = Observable.create(new SequenceSubscribers<>(ps));

        source.subscribe(ts1);
        source.subscribe(ts2);

        ps.onNext(1);
        ps.onNext(2);

        ts1.assertValues(1, 2);
        ts2.assertNoValues();

        ts1.unsubscribe();

        ps.onNext(3);
        ps.onNext(4);
        ps.onCompleted();

        ts1.assertValues(1, 2);
        ts2.assertValues(3, 4);
        ts2.assertCompleted();
    }
}

关于java - RxJava : How to Subscribe only if the Observable is Cold?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35292180/

相关文章:

php - Google 群组和订阅代码

ios - 验证来自多个应用程序的 iOS 自动续订订阅

java - 斐波那契数列除了用户输入的数字外不会打印任何内容

java - 正则表达式在java中不起作用

java - 从数组引用值时出现问题

java - 如何在用户定义的类中使用getDocumentElement?

java - 如果我使用 Single<> 为什么我不应该使用blockingGet() 以及替代品是什么

android - LiveDataReactiveStreams : converting Flowable to LiveData doesn't work

android - 使用 RxAndroid 过滤数据库项目的正确方法

iphone - 针对服务而非内容的 iOS 定期订阅政策