swift - Rx swift : Nested Queries and ReplaySubject

标签 swift reactive-programming rx-swift behaviorsubject

我必须使用三个单独的 API 请求获取三种类型的数据(ATypeBTypeCType)。 API 返回的对象是一对多关联的:

  • 1 AType对象是 N BType 的父级对象
  • 1 BType对象是 P 的父级 CType对象)

我使用以下三个函数来获取每种类型:

func get_A_objects() -> Observable<AType> { /* code here */ }
func get_B_objects(a_parentid:Int) -> Observable<BType> { /* code here */}
func get_C_objects(b_parentid:Int) -> Observable<CType> { /* code here */}

为了避免嵌套订阅,这三个函数使用 flatMap 链接起来:

func getAll() -> Observable<CType> {
  return self.get_A_objects()
     .flatMap { (aa:AType) in  return get_B_objects(aa.id) }
     .flatMap { (bb:BType) in  return get_C_objects(bb.id) }
}

func setup() {
  self.getAll().subscribeNext { _ in
    print ("One more item fetched") 
  }
}

AType 有 M 个对象时,上面的代码工作正常, 我可以看到文字 "One more item fetched"打印 MxNxP 次。

我想设置 getAll()使用 ReplaySubject<String> 传递整个链状态更新的函数.我最初的想法是写这样的东西:

func getAll() -> ReplaySubject<String> {
  let msg = ReplaySubject<String>.createUnbounded()
  self.get_A_objects().doOnNext { aobj in msg.onNext ("Fetching A \(aobj)") }
    .flatMap { (aa:AType) in 
       return get_B_objects(aa.id).doOnNext { bobj in msg.onNext ("Fetching B \(bobj)") }
    }
    .flatMap { (bb:BType) in
       return get_C_objects(bb.id).doOnNext { cobj in msg.onNext ("Fetching C \(cobj)") }
    }

  return msg
}

但是这次尝试失败了,即下面的 print()不打印任何东西。

getAll().subscribeNext {
  print ($0)
}

我应该如何重写我的逻辑?

最佳答案

问题

这是因为你没有保留你的 Disposable s,所以它们会立即被释放,因此什么都不做。

getAll , 你创建一个 Observable<AType>通过get_A_objects() , 但它没有添加到 DisposeBag .当它超出范围时(在 func 的末尾),它将被释放。所以{ aobj in msg.onNext ("Fetching A \(aobj)") }永远不会发生(或者至少不太可能发生,如果它是异步的)。

此外,您没有保留 ReplaySubject<String>getAll().subscribeNext 返回任何一个。因此,出于同样的原因,这也会破坏交易。

解决方案

因为你想要两个 Observable s:一个用于实际最终结果(Observable<CType>),一个用于进度状态(ReplaySubject<String>),您应该从 getAll() 返回两者功能,以便两者都可以“拥有”,并管理它们的生命周期。

func getAll() -> (Observable<CType>, ReplaySubject<String>) {
    let progress = ReplaySubject<String>.createUnbounded()
    let results = self.get_A_objects()......
    return (results, progress)
}

let (results, progress) = getAll()

progress
    .subscribeNext {
        print ($0)
    }
    .addDisposableTo(disposeBag)

results
    .subscribeNext {
        print ($0)
    }
    .addDisposableTo(disposeBag)

一些注意事项:

  • 您不需要使用 createUnbounded ,如果您不小心,可能会很危险。
  • 你可能真的不想使用 ReplaySubject完全没有,因为如果有人在之后订阅并收到旧的进度状态消息,那么说您稍后会“获取”某些内容是一个谎言。考虑使用 PublishSubject .
  • 如果您遵循上述建议,那么您只需确保订阅progress即可。之前results确保您不会错过任何进度状态消息,因为输出将不再被缓冲。
  • 此外,这只是我的意见,但我会将“获取 X Y”重新表述为其他内容,因为您不是“获取”,但您已经“获取ed”它。

关于swift - Rx swift : Nested Queries and ReplaySubject,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38208551/

相关文章:

ios - 如何在 iOS 中自己创建的模型类中使用泛型?

swift - UIImageView 的 If 语句

.net - Rx 与响应式编程有什么关系?

ios - 如何实现具有通用约束类型属性的 Swift 协议(protocol)?

ios - RxSwift - 如何具有可读的 mergeLatest 事件元素变量名称?

ios - Alamofire 和 Objectmapper 将数据添加到 TableView

swift - UICollectionViewCell 注册类失败,但注册 nib 有效

javascript - 从 RxJS promise 流

javascript - 无法使用 array.map 读取未定义的属性

swift - 如何在 MVVM 架构中使用 RxSwift 将参数发送到 View 模型?