swift - 使用 RxSwift 限制对服务类的并发访问

标签 swift reactive-programming rx-swift

给定一个这样的服务类:

class Service {
    let networkService = NetworkService()

    func handleJobA(input: String) -> Observable<ResultA> {
        return networkService
            .computeA(input)
            .map { $0.a }
    }
}

当我像这样从调用方使用它时:

let service = Service()

Observable
    .from(["Hello", "World"])
    .flatMap {
        service.handleJobA($0)
    }
    .subscribe()

然后这将同时向service 发送多个请求。我想让流等到每个请求完成。这可以使用 merge 运算符实现。

Observable
    .from(["Hello", "World"])
    .flatMap {
        Observable.just(
            service.handleJobA($0)
        )
    }
    .merge(maxConcurrent: 1)
    .subscribe()

到目前为止一切顺利 - 服务不会同时执行多个 handleJobA 任务。

然而,并发是一个服务细节,调用者不应该关心它。事实上,该服务在稍后阶段可能会决定允许不同的并发值。

其次,当我添加一个新方法 handleJobB 时,它不能与作业 A 同时处于事件状态,反之亦然。

所以我的问题是:

  1. 如何将 maxConcurrency 限制为 handleJobA observable 作为实现细节?
  2. 哪个 RxSwift 模式允许对任何服务方法进行限制?

最佳答案

您需要一个专用于该服务的串行调度程序。这是一个可以粘贴到 Playground 的示例:

/// playground

import RxSwift

class Service {

    func handleJobA(input: String) -> Observable<String> {

        return Observable.create { observer in
            print("start job a")
            sleep(3)
            observer.onNext(input)
            print("complete job a")
            observer.onCompleted()
            return Disposables.create()
        }.subscribeOn(scheduler)
    }

    func handleJobB(input: String) -> Observable<String> {
        return Observable.create { observer in
            print("start job b")
            sleep(3)
            observer.onNext(input)
            print("complete job b")
            observer.onCompleted()
            return Disposables.create()
            return Disposables.create()
        }.subscribeOn(scheduler)
    }

    let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "Service")
}


let service = Service()

_ = Observable.from(["hello","world","swift"])
    .flatMap { service.handleJobA(input: $0) }
    .subscribe(onNext:{
        print("result " + $0)
    })

_ = Observable.from(["hello","world","swift"])
    .flatMap { service.handleJobB(input: $0) }
    .subscribe(onNext:{
        print("result " + $0)
    })

import PlaygroundSupport

PlaygroundPage.current.needsIndefiniteExecution = true

关于swift - 使用 RxSwift 限制对服务类的并发访问,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43794392/

相关文章:

swift - 如何在选择器事件中添加事件 "send to messenger"?

swift - RxSwift .map 表达式过于复杂,无法在合理的时间内解决;考虑将表达式分解为不同的子表达式

swift - 在存在 UITapGestureRecognizer 的情况下选择 UICollectionViewCell

ios - 为什么 label.text 更新没有反射(reflect)在屏幕上?

ios - 如何在 SwiftUI ZStack 中对齐文本以适应较小的屏幕尺寸?

arrays - 使用 swift 解析 xml 数组并将数据存储到数组或字典中

java - 如何使用来自 ReactiveSecurityContextHolder.getContext() 的参数调用 void 方法

java - rxjava2 observable.just(a Boolean) 不是

swift - 什么是 RxSwift 3.0 等同于 RxSwift 2.x 的 AnonymousDisposable?