android - 使 RxJava 异步任务线程安全

标签 android multithreading rx-java retrofit2

在我的应用程序中,我有一项服务可以跟踪用户位置,然后使用 RxJava 将其发送到服务器。如果请求成功,我会收到插入的 ID,然后我可以从我的本地数据库中删除它们。

  1. 查询数据库以获取要发送的积分
  2. 如果不为空,我会发布所有从数据库中收集到的点
  3. 如果请求成功,我会从数据库中删除发布的积分

我的问题是,我在上一个任务结束之前快速调用该可观察对象,以便服务器接收重复点(两个请求的相同点)。我需要在单线程上执行 Observable 以避免在前一个任务结束之前让另一个任务查询数据库。我创建了一个 Looper 线程,但我仍然发送重复项,但我不知道为什么。服务器请求现在似乎要等到它结束才能执行下一个请求,但在下一个请求中,它仍然发送相同的点!啊啊

    final StoreChangeEvent finalEvent = event;
    Observable
            .defer(() -> Observable.just(database.getAllPoints()))
            .flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList)
                    .map(result -> deletePoint(result))
                    .doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage())))
                    .doOnCompleted(() -> emitStoreChange(finalEvent))
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribeOn(AndroidSchedulers.from(backgroundLooper)))
            .subscribe();

似乎 database.getAllPoints() 被调用得太快了……我应该添加一个 .blocking() 吗?

假设我有 5 个积分要发布到服务器 (A,B,C,D)

  1. 我查询数据库并将 A-B-C-D 发送到服务器
  2. 我从设备接收到另一个点(点 E)
  3. 我查询数据库并发送 (A-B-C-D-E)
  4. 我从服务器收到成功,然后我从本地数据库中删除 A-B-C-D
  5. 我从第二个请求中收到成功,然后我从本地数据库中删除了 A-B-C-D-E

结果:A-B-C-D 在服务器数据库中出现了两次,因为两个请求是用相同的点发送的

最佳答案

选项 1 - 主题

您可以尝试使用 Subject - 一个既充当观察者又充当可观察对象的事物。

订阅它一次并通知它来自其他地方的新事件(onNext())。订阅者将随后处理这些事件。

我使用了 SerializedSubject,以防您从不同的线程调用 notifyNewEvent(),否则您可以使用 BehaviourSubject

SerializedSubject<StoreChangeEvent, StoreChangeEvent> subject = new SerializedSubject(BehaviorSubject.create())

public void initialize() {
    // Since you access your incoming event from doOnCompleted,
    // need this extra flatMap function so that you can access your event
    // outside rx java chain.
    subject.flatMap(new Func1() {
        @Override
        public Observable call(StoreChangeEvent event) {
            return Observable
                    .just(database.getAllPoints())
                    .flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList)
                            .map(result -> deletePoint(result))
                            .doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage())))
                            .doOnCompleted(() -> emitStoreChange(finalEvent)));
        }
    })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(AndroidSchedulers.from(backgroundLooper))
            .subscribe();
}

public void notifyNewEvent(StoreChangeEvent event) {
    subject.onNext(event);
}

选项 2 - 执行者

如果您不访问 UI,为什么还要为主题和 observeOn、subscribeOn 而烦恼。创建一个具有一个线程的执行器(所有任务随后执行),并将任务提交给它,在那里利用 RxJava 的有用功能。

ExecutorService executorService = Executors.newSingleThreadExecutor();

public void notifyNewEvent(final StoreChangeEvent event) {
    executorService.execute(new Runnable() {
        public void run() {
            Observable.just(database.getAllPoints())
                // Blocking, so that the thread doesn't exit
                // and blocks on subscribe() till completion.
                .toBlocking() 
                .flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList)
                    .map(result -> deletePoint(result))
                    .doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage())))
                    .doOnCompleted(() -> emitStoreChange(finalEvent)))
                .subscribe();
        }
    });
}

关于android - 使 RxJava 异步任务线程安全,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39456532/

相关文章:

android - 在 Android 上运行 TensorFlow 模型

android - Android Studio Gradle 4.1更新以在Project Regard APT上给出错误

c# - 线程安全静态类和构造函数

android - 将无限次调用的回调转换为 Observable

java - 如何一次向BehaviorSubject提供多个值

spring - 在RxJava Services中管理事务性的正确方法是什么?

Android SupportMapFragment inside Fragment

Android Play 商店 - 从服务器检索信息时出错。远程过程调用 :S-2:AEC-2

c# - 线程本地存储和本地方法变量

java - 如何逐步执行所有线程