我试图弄清楚如果 getStream() 和 update(value: ...) 将在不同线程上同时调用,这种方法是否线程安全?
final class SomeNotifier {
static let shared = SomeNotifier()
private let value = PassthroughSubject<String, Never>()
private var cancellables: Set<AnyCancellable> = []
private init() {}
func getStream() -> AsyncStream<String> {
return AsyncStream { [weak self] continuation in
guard let self = self else { return }
self.value.sink { completion in
switch completion {
case .finished:
continuation.finish()
case .failure:
continuation.finish()
}
} receiveValue: { value in
continuation.yield(value)
}
.store(in: &cancellables)
}
}
func update(value: String) {
self.value.send(value)
}
我想要一些存储库,可以通知不同的观察者内部状态的变化
最佳答案
Cy-4AH 的变体 answer (随后被删除),它使用 actor
进行同步,我会确保添加一个 onTermination
处理程序,以便在异步序列被取消时删除关联的延续。例如:
actor Notifier<Output> {
private var continuations: [UUID: AsyncStream<Output>.Continuation] = [:]
func values() -> AsyncStream<Output> {
AsyncStream { continuation in
let id = UUID()
continuations[id] = continuation
continuation.onTermination = { _ in
Task { await self.cancel(id) }
}
}
}
func send(_ value: Output) {
for continuation in continuations.values {
continuation.yield(value)
}
}
}
private extension Notifier {
func cancel(_ id: UUID) {
continuations[id] = nil
}
}
主题有很多变化,但实现的细节不如以下一般观察重要:(a) 使用 actor
; (b) 使用 onTermination
处理程序进行清理,以防通知程序对象可能比各个序列的生命周期更长。
FWIW,如果我真的想为 String
通知创建一个单例:
final class StringNotifier: Sendable {
static let shared = StringNotifier()
private init() { }
private let notifier = Notifier<String>()
func values() async -> AsyncStream<String> {
await notifier.values()
}
func send(_ value: String) {
Task { await notifier.send(value) }
}
}
顺便说一句,我通常更喜欢使用 AsyncChannel
对于这些类似“主题”的行为,但是单个 channel 不允许多个观察者,并且如果您尝试收集这些 channel ,它(当前)不提供所需的 onTermination
之类处理程序。
FWIW,如果您要使用合并PassthroughSubject
,它可能看起来像:
actor CombineNotifier<Output> {
private let subject = PassthroughSubject<Output, Never>()
private var cancellables: [UUID: AnyCancellable] = [:]
func values() -> AsyncStream<Output> {
AsyncStream { continuation in
let id = UUID()
cancellables[id] = subject.sink { _ in
continuation.finish()
} receiveValue: { value in
continuation.yield(value)
}
continuation.onTermination = { _ in
Task { await self.cancel(id) }
}
}
}
func send(_ value: Output) {
subject.send(value)
}
}
private extension CombineNotifier {
func cancel(_ id: UUID) {
cancellables[id] = nil
}
}
同样,它是一个提供线程安全交互的actor
,并使用onTermination
来清理各个序列。
关于swift - 线程安全地将发布者组合到 AsyncStream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75855208/