我正在尝试对按钮点击绑定(bind)的某些操作进行错误处理。对于绑定(bind),我使用 RxAndroid+RxAndroid。似乎它必须与下面的代码一起使用,但它不适用于 onBackpressure()
的注释行:
CurrentUser signIn() {
throw new RuntimeException();
}
Integer x = 1;
PublishSubject<Throwable> loginingFailedSubject = PublishSubject.create();
@Override
public void onStart() {
super.onStart();
RxView.clicks(loginButton)
.observeOn(AndroidSchedulers.mainThread())
.doOnNext((v) -> setLoginingWaiting())
.observeOn(Schedulers.newThread())
.map((v) -> signIn())
.lift(new SuppressErrorOperator<>(throwable -> {
Log.e("MyTag", "Oops, failed " + x.toString() + " times!");
++x;
loginingFailedSubject.onNext(throwable);
}))
//.onBackpressureBuffer()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(user -> setLoginedUser(user));
loginingFailedSubject
.observeOn(AndroidSchedulers.mainThread())
.subscribe(throwable -> setLoginingFailed(throwable));
}
这是 SuppressErrorOperator
代码:
public final class SuppressErrorOperator<T> implements
Observable.Operator<T, T> {
final Action1<Throwable> errorHandler;
public SuppressErrorOperator(Action1<Throwable> errorHandler) {
this.errorHandler = errorHandler;
}
public SuppressErrorOperator() {this(null);}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
return new Subscriber<T>(subscriber) {
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
if (errorHandler != null) {
errorHandler.call(e);
}
}
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
};
}
}
这是我在点击 100 次后最后在我的 logcat 中得到的:
糟糕,失败了 16 次!
它在 exaclty 16 次后停止,并在 17 日通过 setLoginingWaiting()
运行(我看到了,因为此方法禁用按钮,这也意味着,没有人可以在每个请求中单击超过 1 次。或接近那个数字)和.. 就是这样。似乎它根本没有到达 .lift()
。
但是如果我取消注释 .onBackpressureBuffer()
,它现在可以完美运行了!我读了很多关于背压的文章。我什至花了一整天的时间来理解 Observable
、Subscriber
等的源代码。
我知道,16 - 是 Android 缓冲区的固定大小。但是为什么会中招呢?我不经常点击按钮。另外,根本没有onNext()
,所以缓冲区在任何情况下都不能超过!所有 onError()
都被 Operator
吞没了。
我也知道 observeOn()
通过 pull 协议(protocol)工作,所以它在内部想要使用 request()
。如果我在 .subscribe(user -> setLoginedUser(user));
之前评论最后一个 observeOn()
- 它也会起作用(但当然,这是 Not Acceptable )。
但是为什么和什么需要 onBackpressure()
才能存活?另外,为什么它会毫无异常(exception)地死去,比如 MissingBackpressureException
或类似的东西?
最佳答案
问题是您干扰了流的生命周期。 map
崩溃,但您抑制了异常并且没有值发送到下游。如果下游没有得到任何值,它就不知道应该请求更多,因此整个序列停滞,让缓冲区填满。 onBackpressureBuffer
之所以有效,是因为它请求 Long.MAX_VALUE 并保持源运行。
但是请注意,map
不应真正像现在这样工作,而是在函数出现第一个错误迹象时取消订阅。
正确的选择是:
RxView.clicks(loginButton)
.observeOn(AndroidSchedulers.mainThread())
.doOnNext((v) -> setLoginingWaiting())
.observeOn(Schedulers.newThread())
.flatMap(v -> {
try {
return Observable.just(signIn());
} catch (Throwable ex) {
Log.e("MyTag", "Oops, failed " + x.toString() + " times!");
++x;
loginingFailedSubject.onNext(ex);
return Observable.empty();
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(user -> setLoginedUser(user));
关于android - 为什么点击事件这里需要onBackpressure()?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35343474/