我有一个关于 subscribe(on:options:) 的问题运算符(operator)。如果有人能帮我弄清楚,我将不胜感激。
所以我们从文档中得到的:
Specifies the scheduler on which to perform subscribe, cancel, and request operations. In contrast with receive(on:options:), which affects downstream messages, subscribe(on:options:) changes the execution context of upstream messages.
另外,我从不同的文章中得到的是,除非我们明确指定
Scheduler
接收我们的下游消息(使用 receive(on:options:)
),消息将发送到 Scheduler
用于接收订阅。此信息与我在执行期间实际获得的信息不一致。
我有下一个代码:
Just("Some text")
.map { _ in
print("Map: \(Thread.isMainThread)")
}
.subscribe(on: DispatchQueue.global())
.sink { _ in
print("Sink: \(Thread.isMainThread)")
}
.store(in: &subscriptions)
我希望下一个输出:Map: false
Sink: false
但相反,我得到:Map: true
Sink: false
当我使用 Sequence
时也会发生同样的事情出版商。如果我交换
map
的位置运算符和 subscribe
运营商,我收到我想要的:Just("Some text")
.subscribe(on: DispatchQueue.global())
.map { _ in
print("Map: \(Thread.isMainThread)")
}
.sink { _ in
print("Sink: \(Thread.isMainThread)")
}
.store(in: &subscriptions)
输出:Map: false
Sink: false
有趣的事实是,当我使用与我的自定义发布者的第一个列表中相同的运算符顺序时,我收到了我想要的行为:struct TestJust<Output>: Publisher {
typealias Failure = Never
private let value: Output
init(_ output: Output) {
self.value = output
}
func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
subscriber.receive(subscription: Subscriptions.empty)
_ = subscriber.receive(value)
subscriber.receive(completion: .finished)
}
}
TestJust("Some text")
.map { _ in
print("Map: \(Thread.isMainThread)")
}
.subscribe(on: DispatchQueue.global())
.sink { _ in
print("Sink: \(Thread.isMainThread)")
}
.store(in: &subscriptions)
输出:Map: false
Sink: false
所以我认为要么我对所有这些机制完全误解,要么一些发布者故意选择线程来发布值( Just
, Sequence
-> Main
, URLSession.DataTaskPublisher
-> Some of Background
),对我来说没有意义,因为在这种情况下我们为什么需要这个 subscribe(on:options:)
为了。你能帮我理解我错过了什么吗?先感谢您。
最佳答案
首先要理解的是,消息既沿管道向上流动,又沿管道向下流动。沿管道(“上游”)向上流动的消息是:
流向管道(“下游”)的消息是:
好的,正如文档中明确指出的那样,
subscribe(on:)
是关于前者:向上流动的消息。但是您实际上并没有在测试中跟踪任何这些消息,因此您的结果都没有反射(reflect)有关它们的任何信息!插入适当的 handleEvents
订阅点上方的运算符,以查看内容沿管道向上流动(例如,实现其 receiveRequest:
参数):Just("Some text")
.handleEvents(receiveRequest: {
_ in print("Handle1: \(Thread.isMainThread)")
})
.map // etc.
同时,您不应该对消息将流向下游的线程(即值和完成)做任何假设。你说:Also, what I got from different articles is that unless we explicitly specify the Scheduler to receive our downstream messages on (using
receive(on:options:)
), messages will be send on the Scheduler used for receiving a subscription.
但这似乎是一个虚假的假设。您的代码没有任何内容可以明确地确定下游发送线程。正如你所说的,你可以用
receive(on:)
来控制它。 ,但如果你不这样做,我会说你不能对这件事做任何假设。一些发布者确实会在后台线程上产生一个值,例如数据任务发布者,这是完全合理的(数据任务完成处理程序也会发生同样的事情)。其他人没有。您可以假设
receive(on:)
以外的运算符通常不会改变值传递线程。但是运营商是否以及如何使用订阅线程来确定接收线程,这是你不应该假设的。要控制接收线程,请控制它!调用 receive(on:)
或假设什么。举个例子,如果你改变你的开口
Just("Some text")
.receive(on: DispatchQueue.main)
那么你的map
和你的sink
将报告他们正在主线程上接收值。为什么?因为你控制了接收线程。无论您在任何 subscribe(on:)
中说什么,这都有效。命令。它们完全是不同的事情。也许如果您调用
subscribe(on:)
但你不调用receive(on:)
,关于下游发送线程的一些事情是由subscribe(on:)
决定的。线程,但我肯定不会依赖任何关于它的硬性规定;文档中没有这样说!相反,不要那样做。如果您实现 subscribe(on:)
, 实现 receive(on:)
也是为了让你负责发生的事情。
关于swift - 合并的订阅(在 :options:) operator,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65851027/