ios - 快速组合 : Unexpected backpressure behaviour with zip operator

标签 ios swift reactive-programming combine

我有一个关于 Combine 中的 zip 运算符与背压结合的问题。

采用以下代码片段:

let sequencePublisher = Publishers.Sequence<Range<Int>, Never>(sequence: 0..<Int.max)
let subject = PassthroughSubject<String, Never>()

let handle = subject
    .zip(sequencePublisher.print())
    .print()
    .sink { letters, digits in
        print(letters, digits)
    }

subject.send("a")

在 playground 中执行此操作时,输出如下:

receive subscription: (0..<9223372036854775807)
receive subscription: (Zip)
request unlimited
request unlimited
receive value: (0)
receive value: (1)
receive value: (2)
receive value: (3)
receive value: (4)
receive value: (5)
receive value: (6)
receive value: (7)
...

在 iOS 设备上执行时,由于内存问题,代码会在几秒后崩溃。

在上面的第四行中可以看出根本原因,其中 zipsequencePublisher 请求无限量的值。由于 sequencePublisher 提供整个范围的 Int 值,这会导致内存溢出。

我想知道的:

  • zip 在组合它们并推送它们之前等待每个发布者的一个值
  • 背压用于控制订阅者对发布者的需求

我的期望是 zip 只从每个发布者请求一个值,等待它们到达,并且只在从每个发布者收到一个值时才请求下一个值。

在这种特殊情况下,我尝试构建一种行为,其中将序列号分配给 subject 产生的每个值。但是,我可以想象,当 zip 组合来自发布频率非常不同的发布者的值时,这总是一个问题。

zip 运算符中使用背压似乎是解决该问题的完美工具。你知道为什么不是这样吗?这是错误还是故意的?如果是故意的,为什么?

谢谢大家

最佳答案

似乎序列发布者只是不切实际。它似乎对背压没有反应;它只是一次吐出整个序列,这在一个发布应该是异步的世界中毫无意义。如果将 Int.max 更改为 3,则没有问题。 :) 我不知道这是一个错误还是序列发布者整个概念中的一个缺陷。

但是,对于您的实际用例来说确实没有问题,因为有一种更好的方法可以为主题的每个发射分配一个连续的数字,即 scan

这是一个更现实的方法:

func delay(_ delay:Double, closure:@escaping ()->()) {
    let when = DispatchTime.now() + delay
    DispatchQueue.main.asyncAfter(deadline: when, execute: closure)
}
class ViewController : UIViewController {
    var storage = Set<AnyCancellable>()
    override func viewDidLoad() {
        super.viewDidLoad()
        let subject = PassthroughSubject<String, Never>()
        subject.scan(("",0)) {t,s in (s,t.1+1)}
            .sink { print($0.0, $0.1)
            }.store(in:&storage)
        delay(1) {
            subject.send("a") // a 1
            delay(1) {
                subject.send("b") // b 2
            }
        }
    }
}

假设您有其他一些原因需要每个连续的枚举通过管道向下传递。但是,如果您的唯一目标是枚举每个到达sink 本身的信号,您可以让sink 本身维护一个计数器(这很容易做到,因为它是一个闭包):

    var storage = Set<AnyCancellable>()
    let subject = PassthroughSubject<String, Never>()
    override func viewDidLoad() {
        super.viewDidLoad()
        var counter = 1
        subject
            .sink {print($0, counter); counter += 1}
            .store(in:&storage)
        delay(1) {
            self.subject.send("a") // a 1
            self.subject.send("b") // b 2
        }
    }

关于ios - 快速组合 : Unexpected backpressure behaviour with zip operator,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59676570/

相关文章:

使用 ssl 的 Spring 5 WebClient

javascript - 无法使react-chartjs选项生效

c# - 统计一个主题的所有订阅

ios - iOS 13 和 iPhone 11 Pro 模拟器下的 tableView.rowHeight = UITableView.automaticDimension 导致无限更新 subview 循环

swift - 使用 Swift,为什么按下一个 UIStepper 会影响两个步进器?

json - "Expected to decode String but found a dictionary instead."

ios - Realm 中的动态属性

objective-c - UIWebView 的 allowInlineMediaPlayback 属性在 iPad 上的 4.2 上不起作用

ios - Apple Pay检测钱包没有信用卡

html - 解析包含 html 数据的 JSON