swift - 为什么 `Publishers.Map`会急切的消费上游值呢?

标签 swift combine backpressure

假设我有一个自定义订阅者请求一个订阅值,然后在收到前一个值三秒后请求一个附加值:

class MySubscriber: Subscriber {
    typealias Input = Int
    typealias Failure = Never

    private var subscription: Subscription?

    func receive(subscription: Subscription) {
        print("Subscribed")

        self.subscription = subscription
        subscription.request(.max(1))
    }

    func receive(_ input: Int) -> Subscribers.Demand {
        print("Value: \(input)")

        DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(3)) {
            self.subscription?.request(.max(1))
        }

        return .none
    }

    func receive(completion: Subscribers.Completion<Never>) {
        print("Complete")
        subscription = nil
    }
}

如果我使用它来订阅一个无限范围的发布者,背压处理得很好,发布者每次等待 3 秒,直到它收到下一个发送值的请求:

(1...).publisher.subscribe(MySubscriber())

// Prints values infinitely with ~3 seconds between each:
//
//     Subscribed
//     Value: 1
//     Value: 2
//     Value: 3
//     ...

但是如果我添加一个 map 运算符,那么 MySubscriber 甚至都不会收到订阅; map 似乎在收到其订阅后同步请求了 Demand.Unlimited,并且当 map 试图耗尽无限范围时,应用会无限旋转:

(1...).publisher
    .map { value in
        print("Map: \(value)")
        return value * 2
    }
    .subscribe(MySubscriber())

// The `map` transform is executed infinitely with no delay:
//
//     Map: 1
//     Map: 2
//     Map: 3
//     ...

我的问题是,为什么 map 会这样?我本以为 map 只是将其下游需求传递给上游。由于 map 应该用于转换而不是副作用,所以我不明白其当前行为的用例是什么。

编辑

我实现了一个 map 版本来展示我认为它应该如何工作:

extension Publishers {
    struct MapLazily<Upstream: Publisher, Output>: Publisher {
        typealias Failure = Upstream.Failure

        let upstream: Upstream
        let transform: (Upstream.Output) -> Output

        init(upstream: Upstream, transform: @escaping (Upstream.Output) -> Output) {
            self.upstream = upstream
            self.transform = transform
        }

        public func receive<S: Subscriber>(subscriber: S) where S.Input == Output, S.Failure == Upstream.Failure {
            let mapSubscriber = Subscribers.LazyMapSubscriber(downstream: subscriber, transform: transform)
            upstream.receive(subscriber: mapSubscriber)
        }
    }
}

extension Subscribers {
    class LazyMapSubscriber<Input, DownstreamSubscriber: Subscriber>: Subscriber {
        let downstream: DownstreamSubscriber
        let transform: (Input) -> DownstreamSubscriber.Input

        init(downstream: DownstreamSubscriber, transform: @escaping (Input) -> DownstreamSubscriber.Input) {
            self.downstream = downstream
            self.transform = transform
        }

        func receive(subscription: Subscription) {
            downstream.receive(subscription: subscription)
        }

        func receive(_ input: Input) -> Subscribers.Demand {
            downstream.receive(transform(input))
        }

        func receive(completion: Subscribers.Completion<DownstreamSubscriber.Failure>) {
            downstream.receive(completion: completion)
        }
    }
}

extension Publisher {
    func mapLazily<Transformed>(transform: @escaping (Output) -> Transformed) -> AnyPublisher<Transformed, Failure> {
        Publishers.MapLazily(upstream: self, transform: transform).eraseToAnyPublisher()
    }
}

使用此运算符,MySubscriber 立即接收订阅并且仅在有需求时才执行 mapLazily 转换:

(1...).publisher
    .mapLazily { value in
        print("Map: \(value)")
        return value * 2
    }
    .subscribe(MySubscriber())

// Only transforms the values when they are demanded by the downstream subscriber every 3 seconds:
//
//     Subscribed
//     Map: 1
//     Value: 2
//     Map: 2
//     Value: 4
//     Map: 3
//     Value: 6
//     Map: 4
//     Value: 8

我的猜测是为 Publishers.Sequence 定义的 map 的特定重载正在使用某种快捷方式来提高性能。这打破了无限序列,但即使对于有限序列,无论下游需求如何,急切地耗尽序列都会打乱我的直觉。在我看来,以下代码:

(1...3).publisher
    .map { value in
        print("Map: \(value)")
        return value * 2
    }
    .subscribe(MySubscriber())

应该打印:

Subscribed
Map: 1
Value: 2
Map: 2
Value: 4
Map: 3
Value: 6
Complete

而是打印:

Map: 1
Map: 2
Map: 3
Subscribed
Value: 2
Value: 4
Value: 6
Complete

最佳答案

这是一个不涉及任何自定义订阅者的更简单的测试:

(1...).publisher
    //.map { $0 }
    .flatMap(maxPublishers: .max(1)) {
        (i:Int) -> AnyPublisher<Int,Never> in
        Just<Int>(i)
            .delay(for: 3, scheduler: DispatchQueue.main)
            .eraseToAnyPublisher()
}
.sink { print($0) }
.store(in: &storage)

它按预期工作,但是如果你取消对 .map 的注释,你将一无所获,因为 .map 运算符在不发布任何内容的情况下累积无限上游值。

根据您的假设,即 map 以某种方式针对前面的序列发布者进行了优化,我尝试了以下解决方法:

(1...).publisher.eraseToAnyPublisher()
    .map { $0 }
    // ...

果然,它解决了问题!通过对 map 运算符隐藏序列发布者,我们阻止了优化。

关于swift - 为什么 `Publishers.Map`会急切的消费上游值呢?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61143246/

相关文章:

ios - 无法快速提取模型类型

swiftui - 如何将全局应用程序状态桥接到模型 View (使用@ObservedObject)?

java - 我怎样才能 "force"某种背压来避免在 rxjava 中多次执行?

ios - 从 Mac 应用程序获取 iOS 设备信息

swift - 以编程方式使 UICollectionViewController 不显示单元格

ios - 在Swift中尝试连接WebSocket时总是出现错误

ios - 如何告诉 SwiftUI View 绑定(bind)到多个嵌套的 ObservableObject

ios - 即使在事件发布后,Combine 中的超时运算符也会超时

java - 从 Flux<Integer> 中分块读取

javascript - Highland.js如何实现背压?