java - RxJava-我的运算符(operator)上缺少Back压异常?

标签 java rx-java reactive-programming

我实现了自己的客户操作符,名为 DoCountOperator,它支持我创建的自定义 Observable 操作符,例如 doOnNextCount()doOnErrorCount( )doOnCompletedCount() 计数。它允许为这些事件创建副作用,并对这些事件的发射计数执行某些操作。例如,doOnCompletedCount() 允许您在 onCompleted() 事件之后创建具有完整发射计数的副作用。

尽管如此,我遇到了大量排放的MissingBackPressureException问题,并且我的堆栈跟踪提到了我的运算符(operator)。这是运算符...

final class DoCountOperator<T> implements Observable.Operator<T,T> {

    interface CountObserver {
        void onNext(int emissionCount);
        void onError(int emissionCount);
        void onCompleted(int emissionCount);
    }

    private final DoCountOperator.CountObserver doObserver;

    DoCountOperator(DoCountOperator.CountObserver doObserver) {
        this.doObserver = doObserver;
    }

    @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        return new Subscriber<T>() {
            private int count = 0;
            private boolean done = false;

            @Override
            public void onCompleted() {
                if (done) {
                    return;
                }
                try {
                    doObserver.onCompleted(count);
                }catch (Throwable throwable) {
                    //Exceptions.throwIfFatal(throwable);
                    onError(throwable);
                }

                done = true;
                child.onCompleted();
            }

            @Override
            public void onError(Throwable throwable) {
                if (done) {
                    return;
                }
                try {
                    doObserver.onError(count);
                } catch (Throwable throwable1) {
                    throwable1.printStackTrace();
                }
                child.onError(throwable);
            }

            @Override
            public void onNext(T t) {
                if (done) {
                    return;
                }
                try {
                    doObserver.onNext(++count);
                }catch (Throwable throwable) {
                    Exceptions.throwIfFatal(throwable);
                }
                child.onNext(t);
            }
        };
    }
}

以下是doOnCompletedCount()如何使用它。当我发出 50,000 到 90,000 个发出的项目时,我收到错误。

public static <T> Observable.Operator<T,T> doOnCompletedCount(final IntConsumer countAction) {
        return new DoCountOperator<>(new DoCountOperator.CountObserver() {
            @Override
            public void onNext(int emissionCount) {

            }

            @Override
            public void onError(int emissionCount) {

            }

            @Override
            public void onCompleted(int emissionCount) {
                countAction.accept(emissionCount);
            }
        });
    }

我无法想象这个运算符(operator)如何会因背压而不知所措,但也许它无法跨链传达背压问题?我到底做错了什么?这是堆栈跟踪...

rx.exceptions.MissingBackpressureException
                at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:352)
                at rx.internal.operators.OperatorMerge$MergeSubscriber.queueScalar(OperatorMerge.java:346)
                at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:329)
                at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:804)
                at rx.internal.operators.OperatorScan$2.onNext(OperatorScan.java:117)
                at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
                at com.swa.rm.common.rx.DoCountOperator$1.onNext(DoCountOperator.java:66)
                at rx.internal.operators.OperatorFinally$1.onNext(OperatorFinally.java:48)
                at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:85)
                at rx.observers.SerializedObserver.onNext(SerializedObserver.java:95)
                at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:95)
                at rx.internal.operators.OperatorConcat$ConcatInnerSubscriber.onNext(OperatorConcat.java:199)
                at com.swa.rm.common.rx.RxOperators$1.onNext(RxOperators.java:49)
                at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
                at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
                at rx.internal.operators.OperatorFinally$1.onNext(OperatorFinally.java:48)
                at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:85)
                at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:365)
                at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:327)
                at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:804)
                at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:365)
                at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:327)
                at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:804)
                at rx.internal.operators.OperatorSubscribeOn$1$1$1.onNext(OperatorSubscribeOn.java:76)
                at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
                at rx.internal.operators.CachedObservable$ReplayProducer.replay(CachedObservable.java:404)
                at rx.internal.operators.CachedObservable$CacheState.dispatch(CachedObservable.java:220)
                at rx.internal.operators.CachedObservable$CacheState.onNext(CachedObservable.java:191)
                at rx.internal.operators.CachedObservable$CacheState$1.onNext(CachedObservable.java:171)
                at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:365)
                at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:327)
                at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:804)
                at rx.observers.SerializedObserver.onNext(SerializedObserver.java:95)
                at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:95)
                at rx.internal.operators.OperatorConcat$ConcatInnerSubscriber.onNext(OperatorConcat.java:199)
                at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:85)
                at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
                at rx.internal.operators.OperatorTake$1.onNext(OperatorTake.java:73)
                at rx.observers.SerializedObserver.onNext(SerializedObserver.java:95)
                at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:95)
                at rx.internal.operators.OperatorConcat$ConcatInnerSubscriber.onNext(OperatorConcat.java:199)
                at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
                at rx.observers.SerializedObserver.onNext(SerializedObserver.java:95)
                at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:95)
                at rx.internal.operators.OperatorConcat$ConcatInnerSubscriber.onNext(OperatorConcat.java:199)
                at rx.internal.operators.OperatorFilter$1.onNext(OperatorFilter.java:54)
                at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:46)
                at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:35)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable.unsafeSubscribe(Observable.java:8098)
                at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:172)
                at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:136)
                at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:79)
                at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
                at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:46)
                at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:35)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable.unsafeSubscribe(Observable.java:8098)
                at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:172)
                at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:136)
                at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:79)
                at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
                at rx.observers.SerializedObserver.onNext(SerializedObserver.java:95)
                at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:95)
                at rx.internal.operators.OperatorConcat$ConcatInnerSubscriber.onNext(OperatorConcat.java:199)
                at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
                at rx.observers.SerializedObserver.onNext(SerializedObserver.java:95)
                at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:95)
                at rx.internal.operators.OperatorConcat$ConcatInnerSubscriber.onNext(OperatorConcat.java:199)
                at rx.internal.operators.OperatorFilter$1.onNext(OperatorFilter.java:54)
                at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:46)
                at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:35)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable.unsafeSubscribe(Observable.java:8098)
                at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:172)
                at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:136)
                at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:79)
                at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
                at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:46)
                at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:35)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable.unsafeSubscribe(Observable.java:8098)
                at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:172)
                at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:136)
                at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:79)
                at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
                at rx.internal.operators.OperatorFilter$1.onNext(OperatorFilter.java:54)
                at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:365)
                at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:327)
                at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:804)
                at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.slowpath(OnSubscribeFromIterable.java:97)
                at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:73)
                at rx.Subscriber.setProducer(Subscriber.java:211)
                at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:49)
                at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:32)
                at rx.Observable.unsafeSubscribe(Observable.java:8098)
                at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:232)
                at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:142)
                at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
                at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
                at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
                at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
                at rx.internal.operators.CachedObservable$ReplayProducer.replay(CachedObservable.java:404)
                at rx.internal.operators.CachedObservable$ReplayProducer.request(CachedObservable.java:304)
                at rx.Subscriber.setProducer(Subscriber.java:211)
                at rx.Subscriber.setProducer(Subscriber.java:205)
                at rx.Subscriber.setProducer(Subscriber.java:205)
                at rx.Subscriber.setProducer(Subscriber.java:205)
                at rx.internal.operators.CachedObservable$CachedSubscribe.call(CachedObservable.java:244)
                at rx.internal.operators.CachedObservable$CachedSubscribe.call(CachedObservable.java:230)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable.unsafeSubscribe(Observable.java:8098)
                at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:172)
                at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:136)
                at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:79)
                at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
                at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.slowpath(OnSubscribeFromIterable.java:97)
                at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:73)
                at rx.Subscriber.setProducer(Subscriber.java:211)
                at rx.Subscriber.setProducer(Subscriber.java:205)
                at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:49)
                at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:32)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable.unsafeSubscribe(Observable.java:8098)
                at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:232)
                at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:142)
                at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
                at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
                at rx.internal.operators.CachedObservable$ReplayProducer.replay(CachedObservable.java:404)
                at rx.internal.operators.CachedObservable$CacheState.dispatch(CachedObservable.java:220)
                at rx.internal.operators.CachedObservable$CacheState.onNext(CachedObservable.java:191)
                at rx.internal.operators.CachedObservable$CacheState$1.onNext(CachedObservable.java:171)
                at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:365)
                at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:327)
                at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:804)
                at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
                at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:102)
                at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85)
                at rx.internal.operators.OperatorToObservableSortedList$2.onCompleted(OperatorToObservableSortedList.java:82)
                at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
                at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$2$2.onCompleted(OperatorGroupBy.java:264)
                at rx.observers.Subscribers$5.onCompleted(Subscribers.java:220)
                at rx.internal.operators.BufferUntilSubscriber.onCompleted(BufferUntilSubscriber.java:155)
                at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140)
                at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.emitItem(OperatorGroupBy.java:339)
                at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.onCompleted(OperatorGroupBy.java:161)
                at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:614)
                at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:526)
                at rx.internal.operators.OperatorMerge$MergeSubscriber.onCompleted(OperatorMerge.java:255)
                at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
                at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140)
                at rx.internal.operators.CachedObservable$ReplayProducer.replay(CachedObservable.java:404)
                at rx.internal.operators.CachedObservable$CacheState.dispatch(CachedObservable.java:220)
                at rx.internal.operators.CachedObservable$CacheState.onCompleted(CachedObservable.java:211)
                at rx.internal.operators.CachedObservable$CacheState$1.onCompleted(CachedObservable.java:179)
                at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:614)
                at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:526)
                at rx.internal.operators.OperatorMerge$MergeSubscriber.onCompleted(OperatorMerge.java:255)
                at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
                at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:614)
                at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:526)
                at rx.internal.operators.OperatorMerge$MergeSubscriber.onCompleted(OperatorMerge.java:255)
                at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
                at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140)
                at rx.internal.operators.CachedObservable$ReplayProducer.replay(CachedObservable.java:404)
                at rx.internal.operators.CachedObservable$CacheState.dispatch(CachedObservable.java:220)
                at rx.internal.operators.CachedObservable$CacheState.onCompleted(CachedObservable.java:211)
                at rx.internal.operators.CachedObservable$CacheState$1.onCompleted(CachedObservable.java:179)
                at rx.observers.SerializedObserver.onCompleted(SerializedObserver.java:183)
                at rx.observers.SerializedSubscriber.onCompleted(SerializedSubscriber.java:65)
                at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:167)
                at rx.internal.operators.OperatorConcat$ConcatSubscriber.onCompleted(OperatorConcat.java:150)
                at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
                at rx.internal.operators.OperatorBufferWithSize$1.onCompleted(OperatorBufferWithSize.java:126)
                at rx.observers.SerializedObserver.onCompleted(SerializedObserver.java:183)
                at rx.observers.SerializedSubscriber.onCompleted(SerializedSubscriber.java:65)
                at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:167)
                at rx.internal.operators.OperatorConcat$ConcatSubscriber.onCompleted(OperatorConcat.java:150)
                at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
                at rx.observers.SerializedObserver.onCompleted(SerializedObserver.java:183)
                at rx.observers.SerializedSubscriber.onCompleted(SerializedSubscriber.java:65)
                at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:167)
                at rx.internal.operators.OperatorConcat$ConcatSubscriber.onCompleted(OperatorConcat.java:150)
                at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.slowpath(OnSubscribeFromIterable.java:101)
                at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:73)
                at rx.Subscriber.setProducer(Subscriber.java:211)
                at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:49)
                at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:32)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable.unsafeSubscribe(Observable.java:8098)
                at rx.internal.operators.CachedObservable$CacheState.connect(CachedObservable.java:183)
                at rx.internal.operators.CachedObservable$CachedSubscribe.call(CachedObservable.java:248)
                at rx.internal.operators.CachedObservable$CachedSubscribe.call(CachedObservable.java:230)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable.unsafeSubscribe(Observable.java:8098)
                at rx.internal.operators.CachedObservable$CacheState.connect(CachedObservable.java:183)
                at rx.internal.operators.CachedObservable$CachedSubscribe.call(CachedObservable.java:248)
                at rx.internal.operators.CachedObservable$CachedSubscribe.call(CachedObservable.java:230)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable.unsafeSubscribe(Observable.java:8098)
                at rx.internal.operators.CachedObservable$CacheState.connect(CachedObservable.java:183)
                at rx.internal.operators.CachedObservable$CachedSubscribe.call(CachedObservable.java:248)
                at rx.internal.operators.CachedObservable$CachedSubscribe.call(CachedObservable.java:230)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable$2.call(Observable.java:162)
                at rx.Observable$2.call(Observable.java:154)
                at rx.Observable.unsafeSubscribe(Observable.java:8098)
                at rx.internal.operators.CachedObservable$CacheState.connect(CachedObservable.java:183)
                at rx.internal.operators.CachedObservable$CachedSubscribe.call(CachedObservable.java:248)
                at rx.internal.operators.CachedObservable$CachedSubscribe.call(CachedObservable.java:230)
                at rx.Observable.unsafeSubscribe(Observable.java:8098)
                at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62)
                at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
                at rx.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.run(ExecutorScheduler.java:98)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
                at java.lang.Thread.run(Thread.java:745)

最佳答案

您忘记在代码中链接下游订阅者的取消订阅和背压“ channel ”。你需要这样的东西:

final class DoCountOperator<T> implements Observable.Operator<T,T> {

    interface CountObserver {
        void onNext(int emissionCount);
        void onError(int emissionCount);
        void onCompleted(int emissionCount);
    }

    private final DoCountOperator.CountObserver doObserver;

    DoCountOperator(DoCountOperator.CountObserver doObserver) {
        this.doObserver = doObserver;
    }

    @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        Subscriber<T> parent = new Subscriber<T>() {
            private int count = 0;
            private boolean done = false;

            @Override
            public void onCompleted() {
                if (done) {
                    return;
                }
                try {
                    doObserver.onCompleted(count);
                }catch (Throwable throwable) {
                    //Exceptions.throwIfFatal(throwable);
                    onError(throwable);
                }

                done = true;
                child.onCompleted();
            }

            @Override
            public void onError(Throwable throwable) {
                if (done) {
                    return;
                }
                try {
                    doObserver.onError(count);
                } catch (Throwable throwable1) {
                    throwable1.printStackTrace();
                }
                child.onError(throwable);
            }

            @Override
            public void onNext(T t) {
                if (done) {
                    return;
                }
                try {
                    doObserver.onNext(++count);
                }catch (Throwable throwable) {
                    Exceptions.throwIfFatal(throwable);
                }
                child.onNext(t);
            }

            @Override
            public void setProducer(Producer p) {
                child.setProducer(p);
            }
        };

        child.add(parent);

        return parent;
    }
}

因此,您不必在 call() 上返回自定义订阅者,而是将其放入本地变量中并调用 child.add(parent) 从而建立取消订阅链。为了使背压发挥作用,最好的方法是重写自定义订阅服务器中的 setProducer 并使用收到的 Producer 实例简单地调用 child.setProducer

关于java - RxJava-我的运算符(operator)上缺少Back压异常?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35301902/

相关文章:

java - 如何通过内部函数向 Action 函数返回异常?

java - 具有最后状态的 RxJava Observable

javascript - Bacon.js 惰性求值,鼠标拖动示例在没有 log() 语句的情况下中断

java - 如何使用 ConnectableObservable 预取然后将处理后的数据用于不同的订阅者

reactive-programming - Spring react 器|对输入进行批处理而不进行变异

c# - Xamarin Forms/ReactiveUI Router-在关闭 subview 后显示 subview 并执行父ViewModel代码

java - MainActivity 中的 setListAdapter

Java:我该如何关闭包裹在 SLF4J 外观中的异步附加程序?

java - SimpleThreads 示例中的 threadMessage 是否应该同步?

java - 将 ByteBuffer 流转换为 Rx 中的行的有效方法