我有以下调度程序:
let scheduler = ConcurrentDispatchQueueScheduler(queue: .global(qos: .background))
尝试过:observeOn(scheduler)
还有:subscribeOn(scheduler)
由于调度程序的原因,我预计来自订阅的回调将在某些后台线程上执行。
func signIn(withExternalUserID userID: UserID, authenticationToken: String) -> Single<QBUUser> {
let authenticationToken = "Bearer \(authenticationToken)"
return .create { observable in
let request = QBRequest.logIn(
withUserLogin: "\(userID)",
password: authenticationToken,
successBlock: { _, user in
user.password = QBSession.current.sessionDetails?.token
observable(.success(user)) // <- Here we are on the main thread
}, errorBlock: { [unowned self] in
observable(.error(ChatError.signInError(self.resolve(errorResponse: $0)))) // <- same here: main thread
}
)
return Disposables.create {
request.cancel()
}
}
}
因为内部API的回调是在主线程中执行的,所以我所有的订阅而不是后台线程也是在主线程中执行的。
tokenRequest
.flatMap { [unowned self] token -> Single<QBUUser> in
return self.chatService.signIn(withExternalUserID: currentUser.id, authenticationToken: token)
}
.flatMap { [unowned self] user -> Single<QBUUser> in
return self.chatService.connect(user: user).andThen(Single.just(user))
}
.observeOn(self.scheduler)
.subscribe(onNext: { [unowned self] user in
self.chatStorage.set(currentQBUser: user, currentUser: currentUser)
completable(.completed)
self.loginSempahore.signal()
}, onError: { [unowned self] error in
self.chatStorage.unsetCurrentQBUser()
completable(.error(error))
self.loginSempahore.signal()
})
.disposed(by: self.disposeBag)
强制来自 flatMap 的所有回调并订阅由后台线程执行的正确方法是什么?
最佳答案
除非库为您提供了执行此操作的选项,否则无法强制函数在特定线程上调用其回调。您的 successBlock
和 errorBlock
闭包将在 QBRequest.logIn
想要调用它们的任何线程上调用,而您对此无能为力。
也就是说,.observeOn(_:)
运算符会将执行转移到不同的线程,因此在上一个代码示例中,您的 onNext
和 onError
闭包将在 self.scheduler
线程上执行。
关于rx-swift - RxSwift subscribeOn 和observeOn 不在预期的后台线程上,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64668094/