swift - 缓冲 Observable 直到另一个 Observable 完成

标签 swift rx-swift reactivex

我正在使用 RxSwift 包装移动应用程序的服务器同步过程。我有一个 Observable<RemoteEvent>包装一个 websocket 连接并发出每条收到的消息作为 Event .同样,我有一个 Observable<SynchronizationResult>包装 API 同步过程。一旦我的应用程序打开 WebSocket 连接,服务器就会发送一个 hello信息。收到该消息后,我想启动同步过程并缓冲所有事件,直到同步完成。这就是我挣扎的地方。目前,我有:

self.eventStreamService.observe(connection).scan((nil, [])) { (state, event) -> (Observable<RemoteEvent>?, [RemoteEvent]) in
  guard event.type == "hello" else {
    return (state.0?.concat(Observable.just(event)), state.1 + [event])
  }

  // This is the sync operation
  return (
    self.synchronizationService
      .synchronize(ConnectionSynchronizationContext(connection: connection), lightweight: true)
      .toArray()
      .flatMap { results -> Observable<RemoteEvent> in
        (state.1 + [event]).toObservable()
      },
    []
  )
}
.flatMapLatest { $0.0 ?? Observable.empty() }

尽管这相当丑陋,但它也有一个重大错误:任何 传入事件都会导致同步 Observable被重新订阅,然后重新启动整个同步过程。我相信一定有更好的方法来做到这一点。

最佳答案

以下是获得所需功能的方法:

// this is a stub for the purpose of the example
let interval = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
let websocketEvents = interval
    .map { i -> String in
        if i == 1 {
            return "hello"
        } else {
            return String(i)
        }
    }
    .replayAll()

websocketEvents.connect()

func performSync() -> Observable<Void> {
    return Observable<Void>.create { o in
        print("starting sync")
        // actually start sync with server
        // ....
        delay(2.0) {
            print("sync finished")
            o.onNext(())
        }
        return NopDisposable.instance
    }
}

// websocket events as they come, regardless of sync status
websocketEvents
    .subscribeNext { e in
        print("websocket event received: \(e)")
    }

// all websocket events, buffered and only emitted post-sync
websocketEvents
    .filter { $0 == "hello" }
    .flatMapLatest { _ in performSync() }
    .flatMapLatest { _ in websocketEvents }
    .subscribeNext { e in
        print("websocket event post sync: \(e)")
    }

这将输出:

websocket event received: 0
websocket event received: hello
starting sync
websocket event received: 2
websocket event received: 3
sync finished
websocket event post sync: 0
websocket event post sync: hello
websocket event post sync: 2
websocket event post sync: 3
websocket event received: 4
websocket event post sync: 4
websocket event received: 5
websocket event post sync: 5

关于swift - 缓冲 Observable 直到另一个 Observable 完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39184148/

相关文章:

ios - 在主线程上同步启动 NSOperation 时出现奇怪的间歇性崩溃

ios - fetchedResultsController.fetchedObjects 在 iOS 10 beta 1 和 Swift 2.3 上崩溃

swift - RxSwift - 映射后函数的调用顺序

ios - drive 只能从 MainThread 调用

javascript - Rxjs 合并()不工作

caching - 在 Angular2 中使用 RxJs 5 为 REST 客户端提供基于时间的缓存

swift - 无法在标签栏 Swift 2 中将文本添加到 ListView

ios - Swift框架的创建和使用

ios - Observable 不响应 .drive 方法

javascript - 合并的可观察量不包含序列,即使合并者包含序列