android - 为什么点击事件这里需要onBackpressure()?

标签 android rx-java rx-android

我正在尝试对按钮点击绑定(bind)的某些操作进行错误处理。对于绑定(bind),我使用 RxAndroid+RxAndroid。似乎它必须与下面的代码一起使用,但它不适用于 onBackpressure() 的注释行:

CurrentUser signIn() {
    throw new RuntimeException();
}
Integer x = 1;
PublishSubject<Throwable> loginingFailedSubject = PublishSubject.create();

@Override
public void onStart() {
    super.onStart();
    RxView.clicks(loginButton)
            .observeOn(AndroidSchedulers.mainThread())
            .doOnNext((v) -> setLoginingWaiting())

            .observeOn(Schedulers.newThread())
            .map((v) -> signIn())
            .lift(new SuppressErrorOperator<>(throwable -> {
                Log.e("MyTag", "Oops, failed " + x.toString() + " times!");
                ++x;
                loginingFailedSubject.onNext(throwable);
            }))
            //.onBackpressureBuffer()

            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(user -> setLoginedUser(user));

    loginingFailedSubject
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(throwable -> setLoginingFailed(throwable));
}

这是 SuppressErrorOperator 代码:

public final class SuppressErrorOperator<T> implements 

Observable.Operator<T, T> {
    final Action1<Throwable> errorHandler;

    public SuppressErrorOperator(Action1<Throwable> errorHandler) {
        this.errorHandler = errorHandler;
    }

    public SuppressErrorOperator() {this(null);}

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
        return new Subscriber<T>(subscriber) {
            @Override
            public void onCompleted() {
                subscriber.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                if (errorHandler != null) {
                    errorHandler.call(e);
                }
            }

            @Override
            public void onNext(T t) {
                subscriber.onNext(t);
            }
        };
    }
}

这是我在点击 100 次后最后在我的 logcat 中得到的: 糟糕,失败了 16 次!

它在 exaclty 16 次后停止,并在 17 日通过 setLoginingWaiting() 运行(我看到了,因为此方法禁用按钮,这也意味着,没有人可以在每个请求中单击超过 1 次。或接近那个数字)和.. 就是这样。似乎它根本没有到达 .lift()

但是如果我取消注释 .onBackpressureBuffer(),它现在可以完美运行了!我读了很多关于背压的文章。我什至花了一整天的时间来理解 ObservableSubscriber 等的源代码。

我知道,16 - 是 Android 缓冲区的固定大小。但是为什么会中招呢?我不经常点击按钮。另外,根本没有onNext(),所以缓冲区在任何情况下都不能超过!所有 onError() 都被 Operator 吞没了。

我也知道 observeOn() 通过 pull 协议(protocol)工作,所以它在内部想要使用 request()。如果我在 .subscribe(user -> setLoginedUser(user)); 之前评论最后一个 observeOn() - 它也会起作用(但当然,这是 Not Acceptable )。

但是为什么和什么需要 onBackpressure() 才能存活?另外,为什么它会毫无异常(exception)地死去,比如 MissingBackpressureException 或类似的东西?

最佳答案

问题是您干扰了流的生命周期。 map 崩溃,但您抑制了异常并且没有值发送到下游。如果下游没有得到任何值,它就不知道应该请求更多,因此整个序列停滞,让缓冲区填满。 onBackpressureBuffer 之所以有效,是因为它请求 Long.MAX_VALUE 并保持源运行。

但是请注意,map 不应真正像现在这样工作,而是在函数出现第一个错误迹象时取消订阅。

正确的选择是:

RxView.clicks(loginButton)
.observeOn(AndroidSchedulers.mainThread())
.doOnNext((v) -> setLoginingWaiting())
.observeOn(Schedulers.newThread())
.flatMap(v -> {
    try {
        return Observable.just(signIn());
    } catch (Throwable ex) {
        Log.e("MyTag", "Oops, failed " + x.toString() + " times!");
        ++x;
        loginingFailedSubject.onNext(ex);
        return Observable.empty();
    }
 })
 .observeOn(AndroidSchedulers.mainThread())
 .subscribe(user -> setLoginedUser(user));

关于android - 为什么点击事件这里需要onBackpressure()?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35343474/

相关文章:

android - 如何使用 greenDAO 将一个对象加载到另一个对象中?

android - 为什么 GestureOverlayView 这么慢?

android - Quickblox 跨应用程序共享用户

scala - RxScala 中 FRP 的 Event 和 Behavior 对应的概念是什么?

java - RxJava : difference between doOnNext and doOnEach

java - RxJava/安卓 : Combine result of two dependent Observables

android - 有什么方法可以在 TextView 中插入 ImageSpan 而不会破坏文本?

java - RxAndroid 和多线程

android - 如何在改造 android 的请求正文中传递具有空值或空字符串的键

android - Observable.just 与 RxJava 中的 Single