java - 同步定期更新和创建、更新、删除操作

标签 java rx-java2 synchronized thread-synchronization

我使用Flowable链定期调用用户任务的请求(API)。

Flowable.interval(60, TimeUnit.SECONDS)
.flatMapCompletable(_tick ->
    mUserRepository.getAllUsers()
    .flatMapObservable(Observable::fromIterable)
    .flatMap(user ->
      downloadAndPersistTasks(user)
      .subscribeOn(Schedulers.io())
    )
    .subscribeOn(Schedulers.io())
  , false, 1
);

方法downloadAndPersistData下载每个用户的当前任务,从数据库中删除所有旧任务并保留下载的任务。可以从服务器端更改用户的任务。

问题是下载时间相对较长。

本案例:

  1. 下载任务 - 开始
  2. 插入任务 - API 调用 + 保存到数据库 - 开始
  3. 插入任务 - API 调用 + 保存到数据库 - 结束
  4. 下载任务 - 结束(没有插入任务的记录)
  5. 将下载的记录写入数据库 -> 插入的任务被覆盖

我需要在执行插入API调用时禁用特定用户的数据下载,并在执行定期更新时禁用函数插入任务。

是否有任何 RxJava 解决方案或者是使用 Java 框架的 native 同步原语的最佳选择?但我不需要跳过定期更新,只需延迟它。

最佳答案

我假设任务插入步骤是在本地完成的,以及 downloadAndPersistTasks() .

让我们介绍一个类型联合:Either<L,R> 。它有静态工厂方法Either.<L>createLeft( L value )Either.<R>createRight( R value ) 。它还有类方法:isLeft() , isRight() ,和getLeft()/getRight()做自然的事。

一个Subject用于注入(inject)插入任务步骤:

PublishSubject<InsertTask> taskInserter = PublishSubject.create();

然后我们通过以下方式组合它们:

Flowable.timer(60, SECONDS)
  .flatMap( tick -> 
      Observable.fromIterable( mUserRepository.getAllUsers() ), 1 )
  .map( t -> Either.createLeft(t) )
  .mergeWith( taskInserter.map( i -> Either.createRight( i ) ), 1 )
  .observeOn( scheduler )
  .subscribe( ti -> {
    if ( ti.isLeft() ) {
      downloadAndPersistTasks( ti.getLeft() );
    } else {
      insertTask( ti.getRight() );
    }
   );

flatMap() 的第二个参数确保一次仅处理一个用户,mergeWith() 也是如此。运算符(operator)。合并第二个流可确保一次仅执行一项操作,并且 observeOn()运算符将所有操作放在同一个线程上,因此不会出现争用。

如果您需要更大的并行性或更细粒度的控制,您可能需要为每个用户引入一个可观察对象。

关于java - 同步定期更新和创建、更新、删除操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49632936/

相关文章:

java - DOM解析说明

java - 使用 JNI 仅将缓冲区的一部分从 native 代码复制到 Java

java - 正则表达式返回一个 boolean 值而不是一个字符串

java - 忽略 Observable<Single<T>> 中的错误

java - 跳出同步块(synchronized block)?

Java处理延时绘图不工作

java - RxJava2 中的 flatMapPublisher 是什么?

android - DiffUtil 和 registerAdapterDataObserver

java - 使用同步列表

scala - Scala 中的 SynchronizedSet 和 set 操作