java - RxJava 2.0 - 处理 publish().refCount() 中未捕获订阅者错误的资源

标签 java exception rx-java rx-java2

我是 RxJava 的新手,和其他许多人一样,我正在努力了解异常处理。我在网上阅读了很多帖子(例如此处的讨论 how to handle exceptions thrown by observer's onNext)并且认为我了解了这些概念的基本概念。

在上面提到的讨论中,其中一位发帖者说,当订阅者抛出异常时,RxJava 会执行以下操作:

Implement generic handling to log the failure and stop sending it events (of any kind) and clean up any resources due to that subscriber and carry on with any remaining subscriptions.

这或多或少也是我所看到的,我唯一有问题的是“清理任何资源”位。为了清楚起见,让我们假设以下示例:

我想创建一个 Observable 来监听异步事件源(例如 JMS 队列)和 onNext()s 在每个接收到的消息上。所以在(伪)代码中我会做类似的事情:

Observable<String> observable = Observable.create( s -> {
  createConnectionToBroker();
  getConsumer().setMessageListener(message -> s.onNext(transform(message)));
  s.setDisposable(new Disposable() {
    public void dispose() {
      tearDownBrokerConnection();
    }
  });
});

因为我想为许多订阅者/观察者重用消息监听器,所以我不直接在创建的 Observable 上订阅,而是使用 publish().refCount() 组。类似这样的东西:

Observable<String> observableToSubscribeTo = observable.publish().refCount();

Disposable d1 = observableToSubscribeTo.subscribe(s -> ...);
Disposable d2 = observableToSubscribeTo.subscribe(s -> ...);

这一切都按预期工作。代码仅在建立第一个订阅时连接到 JMS,并在最后一个观察者 dispose()d 时关闭与代理的连接。

但是,当订阅者在 onNext()ed 时抛出异常时,事情似乎变得一团糟。正如预期的那样,throw 的观察者被删除了,每当发布新事件时,它都不会再收到通知。我的问题似乎是,当所有剩余的订阅者都是 dispose()d 时,不再通知保持与消息代理连接的 Observable。在我看来,好像抛出异常的订户处于某种僵尸状态。当涉及到事件分发时它会被忽略,但它会以某种方式阻止根 Observable 在最后一个订阅者是 dispose()d 时得到通知。

我理解 RxJava 期望观察者确保不抛出异常,而是正确处理最终的异常。不幸的是,在我想提供一个向调用者返回 Observable 的库的情况下,我无法控制我的订阅者。这意味着,我永远无法保护我的图书馆免受愚蠢的观察者的侵害。

所以,我问自己:我是否遗漏了什么?当订阅者抛出时真的没有机会正确清理吗?这是一个错误还是只是我不了解图书馆?

非常感谢任何见解!

最佳答案

如果您可以展示一些单元测试来证明问题(不需要 JMS),那就太好了。

此外,RxJava 2 中的 onNext 永远不应该抛出;如果是,则为未定义行为。如果您不信任您的消费者,您可以使用一个终端可观察转换器来执行 safeSubscribe 而不是普通的 subscribe 以防止下游的不当行为:

.compose(o -> v -> o.safeSubscribe(v))

.compose(new ObservableTransformer<T>() {
    @Override public Observable<T> apply(final Observable<T> source) {
        return new Observable<T>() {
            @Override public void subscribeActual(Observer<? super T> observer) {
                 source.safeSubscribe(observer);
            }
        };
    }
})

关于java - RxJava 2.0 - 处理 publish().refCount() 中未捕获订阅者错误的资源,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42079234/

相关文章:

python - 是否有用于验证函数输入的内置 Python 异常?

android - RxJava : how to recover from errors in flatMap operator

java - 使用 RxJava/Jersey2 的异步 RestAPI。线程问题?

java - 为什么 ByteBuffers 的 hashCodes 是一样的?

java - Google App Engine 中的 Firebase 初始化错误

java - Java 中的 native 方法是什么,应该在哪里使用它们?

java - 为什么会引发ConcurrentModificationException以及如何对其进行调试

java - 使用 Apache Derby 运行 TestDB 程序 - java.sql.SQLException : The url cannot be null

rx-java - 我可以创建一个 Kotlin 扩展方法来将 rxJava 订阅添加到 CompositeSubscription 吗?

java - 过度依赖一个 arrayList