android - RxJava : closing a resource after every subscriber handled it

标签 android rx-java rx-android

我是 RxJava 的新手,我正在努力弄清楚如何正确关闭资源,尤其是在处理多个订阅者时。

我有一个 Observable<T>其中 T是一些 Closeable 资源(例如 Android 数据库 Cursor

我可能在 observable 上有多个订阅者。我要close()每个订阅者完成处理后的资源。换句话说,在新资源交付/发出后关闭旧资源,并在最后一个订阅者取消订阅时最终关闭最后一个资源。

我尝试使用我称之为 AutoCloseOperator 的自定义运算符使其工作,它几乎可以工作,但不太正确。 IE。我仍然是竞争条件和/或泄漏,例如资源不会关闭。

在 RxJava 中执行此操作的正确方法是什么?

假设我有这段代码:

final AutoCloseOperator<MyResource> autoClose = new AutoCloseOperator<MyResource>();
Subject<MyResource, MyResource> subject = PublishSubject.create();
Observable<MyResource> o = subject.lift(autoClose);

Subscription s1 = o.subscribe(new Action1<MyResource>() {
    public void call(MyResource myObj) {
        System.out.println("s1 handling " + myObj);
    }
});

subject.onNext(new MyResource(1));
subject.onNext(new MyResource(2)); // This should close Resource #1 after Resource #2 is delivered

Subscription s2 = o.subscribe(new Action1<MyResource>() {
    public void call(MyResource myObj) {
        System.out.println("s2 handling " + myObj);
    }
});

subject.onNext(new MyResource(3));
subject.onNext(new MyResource(4));

s1.unsubscribe();

subject.onNext(new MyResource(5));
subject.onNext(new MyResource(6));

s2.unsubscribe();

subject.onNext(new MyResource(7));
subject.onNext(new MyResource(8));

然后我会期待以下行为:

s1 handling Resource #1
s1 handling Resource #2
Closing Resource #1
s1 handling Resource #3
Closing Resource #2
s2 handling Resource #3
s1 handling Resource #4
s2 handling Resource #4
Closing Resource #3
s2 handling Resource #5
Closing Resource #4
s2 handling Resource #6
Closing Resource #5
Closing Resource #6
Closing Resource #7
Closing Resource #8

注意:我不使用 PublishSubject在我的真实代码中,我只是为了说明目的在这里使用它,我使用 Observable.create发出 Cursor每次更新数据库表时...

为了概括这个问题:我可以使用 doOnNextdoOnUnsubscribe关闭旧项目,但这并没有考虑到这些事件会发生多次(对于每个订阅者),我只想在所有订阅者都收到新项目时关闭资源。

是使用 lift() 的自定义运算符要走的路,或者是否有一些现有的运营商可以帮助解决这个问题?

我已将我的问题简化为一个小型命令行应用程序 here on GitHub .感谢您的关注!

最佳答案

Observable.using()是你需要的。

如果你有t类型 T它有一个 .close()方法,你想从 t 中提取一些东西(你的光标)说 Observable<R>那么这里是如何做到的:

Func0<T> resourceFactory = () -> t;
Func1<T, Observable<R>> observableFactory = x -> ...
Action1<T> disposeAction = x -> x.close();

Observable<R> results = Observable.using(resourceFactory, observableFactory, disposeAction);

你提到你有 Observable<T> .要从所有 T 中获取所有 R,请像这样使用上面的代码:

Observable<T> source = ...
Observable<R> results = 
    source.flatMap(t -> {
        Func0<T> resourceFactory = () -> t;
        Func1<T, Observable<R>> observableFactory = x -> ...
        Action1<T> disposeAction = x -> x.close();
        return Observable.using(resourceFactory, observableFactory, disposeAction);});

关于android - RxJava : closing a resource after every subscriber handled it,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31932871/

相关文章:

java - RxJava 流 : conditional operators and error handling

android - 如何从 Observable 获取多种数据类型,因为仅接受 Observable<T> 的一种泛型类型

android - 如何在构建源代码之前在 android 中创建自定义启动屏幕?

java - Android 应用程序中的共享首选项

android - 指定的子项已有父项。您必须先在子项的父级上调用 removeView()——动态添加按钮时出现此错误

java - TestScheduler 不适用于 RxJava

android - Retrofit调用enqueue方法或者Rxjava

android - 将 Esri ArcGIS 4.3 中的 basemap URL 更改为以 https ://instead of file://开头

rx-java - 调用 d.dispose() 或 s.cancel() 方法的正确方法是什么?

android - 如何在Android中获取ObservableField的值