swift - 线程安全地将发布者组合到 AsyncStream

标签 swift combine swift-concurrency

我试图弄清楚如果 getStream() 和 update(value: ...) 将在不同线程上同时调用,这种方法是否线程安全?

final class SomeNotifier {

static let shared = SomeNotifier()

private let value = PassthroughSubject<String, Never>()
private var cancellables: Set<AnyCancellable> = []

private init() {}

func getStream() -> AsyncStream<String> {
    return AsyncStream { [weak self] continuation in
        guard let self = self else { return }

        self.value.sink { completion in
            switch completion {
            case .finished:
                continuation.finish()
            case .failure:
                continuation.finish()
            }
        } receiveValue: { value in
            continuation.yield(value)
        }
        .store(in: &cancellables)
    }
}

func update(value: String) {
    self.value.send(value)
}

我想要一些存储库,可以通知不同的观察者内部状态的变化

最佳答案

Cy-4AH 的变体 answer (随后被删除),它使用 actor 进行同步,我会确保添加一个 onTermination 处理程序,以便在异步序列被取消时删除关联的延续。例如:

actor Notifier<Output> {
    private var continuations: [UUID: AsyncStream<Output>.Continuation] = [:]

    func values() -> AsyncStream<Output> {
        AsyncStream { continuation in
            let id = UUID()
            continuations[id] = continuation

            continuation.onTermination = { _ in
                Task { await self.cancel(id) }
            }
        }
    }

    func send(_ value: Output) {
        for continuation in continuations.values {
            continuation.yield(value)
        }
    }
}

private extension Notifier {
    func cancel(_ id: UUID) {
        continuations[id] = nil
    }
}

主题有很多变化,但实现的细节不如以下一般观察重要:(a) 使用 actor; (b) 使用 onTermination 处理程序进行清理,以防通知程序对象可能比各个序列的生命周期更长。


FWIW,如果我真的想为 String 通知创建一个单例:

final class StringNotifier: Sendable {
    static let shared = StringNotifier()

    private init() { }

    private let notifier = Notifier<String>()

    func values() async -> AsyncStream<String> {
        await notifier.values()
    }

    func send(_ value: String) {
        Task { await notifier.send(value) }
    }
}

顺便说一句,我通常更喜欢使用 AsyncChannel对于这些类似“主题”的行为,但是单个 channel 不允许多个观察者,并且如果您尝试收集这些 channel ,它(当前)不提供所需的 onTermination 之类处理程序。


FWIW,如果您要使用合并PassthroughSubject,它可能看起来像:

actor CombineNotifier<Output> {
    private let subject = PassthroughSubject<Output, Never>()
    private var cancellables: [UUID: AnyCancellable] = [:]

    func values() -> AsyncStream<Output> {
        AsyncStream { continuation in
            let id = UUID()

            cancellables[id] = subject.sink { _ in
                continuation.finish()
            } receiveValue: { value in
                continuation.yield(value)
            }

            continuation.onTermination = { _ in
                Task { await self.cancel(id) }
            }
        }
    }

    func send(_ value: Output) {
        subject.send(value)
    }
}

private extension CombineNotifier {
    func cancel(_ id: UUID) {
        cancellables[id] = nil
    }
}

同样,它是一个提供线程安全交互的actor,并使用onTermination来清理各个序列。

关于swift - 线程安全地将发布者组合到 AsyncStream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75855208/

相关文章:

ios - 我可以在 Swift 中使用 actor 始终在主线程上调用函数吗?

swift - 何时将非隔离与参与者的存储属性一起使用?

ios - 为什么 SwiftUI View 模型应该用 @MainActor 注释?

arrays - 检查数组是否包含 Swift 中字符串的一部分?

ios - 如何使用 Swift 在背景图像上设置 alpha

ios - SpriteKit 场景没有重置?

Swift 结合订阅、正确的流程和架构选择

swift - 传递给泛型函数时如何访问@Published 属性的包装值

ios - 如何添加前导填充以在 UIStackView 中添加 View

swift - 从 View 模型中关闭 View [MODAL PAGE]