我是 RxJava 的新手,我正在努力弄清楚如何正确关闭资源,尤其是在处理多个订阅者时。
我有一个 Observable<T>
其中 T
是一些 Closeable
资源(例如 Android 数据库 Cursor
。
我可能在 observable 上有多个订阅者。我要close()
每个订阅者完成处理后的资源。换句话说,在新资源交付/发出后关闭旧资源,并在最后一个订阅者取消订阅时最终关闭最后一个资源。
我尝试使用我称之为 AutoCloseOperator
的自定义运算符使其工作,它几乎可以工作,但不太正确。 IE。我仍然是竞争条件和/或泄漏,例如资源不会关闭。
在 RxJava 中执行此操作的正确方法是什么?
假设我有这段代码:
final AutoCloseOperator<MyResource> autoClose = new AutoCloseOperator<MyResource>();
Subject<MyResource, MyResource> subject = PublishSubject.create();
Observable<MyResource> o = subject.lift(autoClose);
Subscription s1 = o.subscribe(new Action1<MyResource>() {
public void call(MyResource myObj) {
System.out.println("s1 handling " + myObj);
}
});
subject.onNext(new MyResource(1));
subject.onNext(new MyResource(2)); // This should close Resource #1 after Resource #2 is delivered
Subscription s2 = o.subscribe(new Action1<MyResource>() {
public void call(MyResource myObj) {
System.out.println("s2 handling " + myObj);
}
});
subject.onNext(new MyResource(3));
subject.onNext(new MyResource(4));
s1.unsubscribe();
subject.onNext(new MyResource(5));
subject.onNext(new MyResource(6));
s2.unsubscribe();
subject.onNext(new MyResource(7));
subject.onNext(new MyResource(8));
然后我会期待以下行为:
s1 handling Resource #1
s1 handling Resource #2
Closing Resource #1
s1 handling Resource #3
Closing Resource #2
s2 handling Resource #3
s1 handling Resource #4
s2 handling Resource #4
Closing Resource #3
s2 handling Resource #5
Closing Resource #4
s2 handling Resource #6
Closing Resource #5
Closing Resource #6
Closing Resource #7
Closing Resource #8
注意:我不使用 PublishSubject
在我的真实代码中,我只是为了说明目的在这里使用它,我使用 Observable.create
发出 Cursor
每次更新数据库表时...
为了概括这个问题:我可以使用 doOnNext
和 doOnUnsubscribe
关闭旧项目,但这并没有考虑到这些事件会发生多次(对于每个订阅者),我只想在所有订阅者都收到新项目时关闭资源。
是使用 lift()
的自定义运算符要走的路,或者是否有一些现有的运营商可以帮助解决这个问题?
我已将我的问题简化为一个小型命令行应用程序 here on GitHub .感谢您的关注!
最佳答案
Observable.using()
是你需要的。
如果你有t
类型 T
它有一个 .close()
方法,你想从 t
中提取一些东西(你的光标)说 Observable<R>
那么这里是如何做到的:
Func0<T> resourceFactory = () -> t;
Func1<T, Observable<R>> observableFactory = x -> ...
Action1<T> disposeAction = x -> x.close();
Observable<R> results = Observable.using(resourceFactory, observableFactory, disposeAction);
你提到你有 Observable<T>
.要从所有 T 中获取所有 R,请像这样使用上面的代码:
Observable<T> source = ...
Observable<R> results =
source.flatMap(t -> {
Func0<T> resourceFactory = () -> t;
Func1<T, Observable<R>> observableFactory = x -> ...
Action1<T> disposeAction = x -> x.close();
return Observable.using(resourceFactory, observableFactory, disposeAction);});
关于android - RxJava : closing a resource after every subscriber handled it,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31932871/