在我的应用程序中,我有一项服务可以跟踪用户位置,然后使用 RxJava
将其发送到服务器。如果请求成功,我会收到插入的 ID,然后我可以从我的本地数据库中删除它们。
- 查询数据库以获取要发送的积分
- 如果不为空,我会发布所有从数据库中收集到的点
- 如果请求成功,我会从数据库中删除发布的积分
我的问题是,我在上一个任务结束之前快速调用该可观察对象,以便服务器接收重复点(两个请求的相同点)。我需要在单线程上执行 Observable
以避免在前一个任务结束之前让另一个任务查询数据库。我创建了一个 Looper
线程,但我仍然发送重复项,但我不知道为什么。服务器请求现在似乎要等到它结束才能执行下一个请求,但在下一个请求中,它仍然发送相同的点!啊啊
final StoreChangeEvent finalEvent = event;
Observable
.defer(() -> Observable.just(database.getAllPoints()))
.flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList)
.map(result -> deletePoint(result))
.doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage())))
.doOnCompleted(() -> emitStoreChange(finalEvent))
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.from(backgroundLooper)))
.subscribe();
似乎 database.getAllPoints()
被调用得太快了……我应该添加一个 .blocking() 吗?
假设我有 5 个积分要发布到服务器 (A,B,C,D)
- 我查询数据库并将 A-B-C-D 发送到服务器
- 我从设备接收到另一个点(点 E)
- 我查询数据库并发送 (A-B-C-D-E)
- 我从服务器收到成功,然后我从本地数据库中删除 A-B-C-D
- 我从第二个请求中收到成功,然后我从本地数据库中删除了 A-B-C-D-E
结果:A-B-C-D 在服务器数据库中出现了两次,因为两个请求是用相同的点发送的
最佳答案
选项 1 - 主题
您可以尝试使用 Subject
- 一个既充当观察者又充当可观察对象的事物。
订阅它一次并通知它来自其他地方的新事件(onNext()
)。订阅者将随后处理这些事件。
我使用了 SerializedSubject
,以防您从不同的线程调用 notifyNewEvent()
,否则您可以使用 BehaviourSubject
。
SerializedSubject<StoreChangeEvent, StoreChangeEvent> subject = new SerializedSubject(BehaviorSubject.create())
public void initialize() {
// Since you access your incoming event from doOnCompleted,
// need this extra flatMap function so that you can access your event
// outside rx java chain.
subject.flatMap(new Func1() {
@Override
public Observable call(StoreChangeEvent event) {
return Observable
.just(database.getAllPoints())
.flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList)
.map(result -> deletePoint(result))
.doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage())))
.doOnCompleted(() -> emitStoreChange(finalEvent)));
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.from(backgroundLooper))
.subscribe();
}
public void notifyNewEvent(StoreChangeEvent event) {
subject.onNext(event);
}
选项 2 - 执行者
如果您不访问 UI,为什么还要为主题和 observeOn、subscribeOn 而烦恼。创建一个具有一个线程的执行器(所有任务随后执行),并将任务提交给它,在那里利用 RxJava 的有用功能。
ExecutorService executorService = Executors.newSingleThreadExecutor();
public void notifyNewEvent(final StoreChangeEvent event) {
executorService.execute(new Runnable() {
public void run() {
Observable.just(database.getAllPoints())
// Blocking, so that the thread doesn't exit
// and blocks on subscribe() till completion.
.toBlocking()
.flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList)
.map(result -> deletePoint(result))
.doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage())))
.doOnCompleted(() -> emitStoreChange(finalEvent)))
.subscribe();
}
});
}
关于android - 使 RxJava 异步任务线程安全,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39456532/