c# - 在没有 onBackpressureLatest 的情况下处理 Rx.NET 中的背压

标签 c# f# system.reactive reactive-programming

我需要在 Rx.NET 中实现以下算法:

  1. stream 中获取最新的项目,如果没有新项目,则不阻塞地等待新项目。只有最新的项目很重要,其他的可以放弃。
  2. 将项目输入到 SlowFunction 并打印输出。
  3. 从第 1 步开始重复。

天真的解决方案是:

let PrintLatestData (stream: IObservable<_>) =
    stream.Select(SlowFunction).Subscribe(printfn "%A")

但是,此解决方案不起作用,因为平均而言 stream 发出项目的速度比 SlowFunction 消耗它们的速度快。由于 Select 不会丢弃项目,而是尝试按从旧到新的顺序处理每个项目,因此在程序运行时,项目发出和打印之间的延迟将增长到无穷大。应仅从流中获取最新的最近项,以避免这种无限增长的背压。

我搜索了文档并在 RxJava 中找到了一个名为 onBackpressureLatest 的方法,根据我的理解,它可以执行我上面描述的操作。但是,该方法在 Rx.NET 中不存在。如何在 Rx.NET 中实现它?

最佳答案

我想你想使用像 ObserveLatestOn 这样的东西。它有效地用单个值和一个标志替换了传入事件队列。

James World 在这里发表了博客 http://www.zerobugbuild.com/?p=192

这个概念在 GUI 应用程序中大量使用,它们不能相信服务器向其推送数据的速度有多快。

您还可以在 Reactive Trader 中看到一个实现 https://github.com/AdaptiveConsulting/ReactiveTrader/blob/83a6b7f312b9ba9d70327f03d8d326934b379211/src/Adaptive.ReactiveTrader.Shared/Extensions/ObservableExtensions.cs#L64 以及解释 ReactiveTrader 的支持演示文稿 https://leecampbell.com/presentations/#ReactConfLondon2014

需要明确的是,这是一种负载卸载算法,而不是背压算法。

关于c# - 在没有 onBackpressureLatest 的情况下处理 Rx.NET 中的背压,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42236757/

相关文章:

c# - 在 C# 中处理视频源的资源

c# - 从客户端检测到具有潜在危险的 Request.Form 值

f# - 如何在 F# 中定义和实现返回无类型 IEnumerable 的接口(interface)方法?

f# - 如何在 fsi 中取消定义?

c# - 减少受试者同步的痛苦

rabbitmq - RX 与 rabbitmq 或 zeromq 等消息队列的对比?

c# - 为什么我不能以相同的方式初始化对象(不使用 ref)?

C#:SQL - 一次一个查询与命令文本中的多个查询?

generics - F#:帮助创建看似动态类型的成员约束

c# - 在一个时间跨度内触发两个具有先决条件的事件时订阅