f# - 如何更改 Rx Builder 实现来修复堆栈溢出异常?

标签 f# system.reactive computation-expression

我正在尝试开发一个 Rx Builder,以便在 F# 计算表达式语法中使用响应式扩展。我该如何修复它,以免堆栈崩溃?就像下面的 Seq 例子一样。 是否有计划提供 RxBuilder 的实现作为响应式扩展的一部分或作为 .NET Framework future 版本的一部分?

open System
open System.Linq
open System.Reactive.Linq

type rxBuilder() =    
    member this.Delay f = Observable.Defer f
    member this.Combine (xs: IObservable<_>, ys : IObservable<_>) = 
        Observable.merge xs ys      
    member this.Yield x = Observable.Return x
    member this.YieldFrom (xs:IObservable<_>) = xs

let rx = rxBuilder()

let rec f x = seq { yield x 
                    yield! f (x + 1) }

let rec g x = rx { yield x 
                    yield! g (x + 1) }


//do f 5 |> Seq.iter (printfn "%A")

do g 5 |> Observable.subscribe (printfn "%A") |> ignore

do System.Console.ReadLine() |> ignore

最佳答案

一个简短的答案是,Rx 框架不支持使用这样的递归模式生成可观察量,因此它不容易完成。用于 F# 序列的Combine 操作需要一些可观察量不提供的特殊处理。 Rx 框架可能希望您使用 Observable.Generate 生成可观察量,然后使用 LINQ 查询/F# 计算生成器来处理它们。

无论如何,这里有一些想法 -

首先,您需要将 Observable.merge 替换为 Observable.Concat。第一个并行运行两个可观察量,而第二个首先从第一个可观察量产生所有值,然后从第二个可观察量产生值。进行此更改后,该代码片段在堆栈溢出之前至少会打印约 800 个数字。

堆栈溢出的原因是 Concat 创建一个调用 Concat 的可观察量来创建另一个调用 Concat 的可观察量,等等。一种方法解决这个问题的方法是添加一些同步。如果您使用的是 Windows 窗体,则可以修改 Delay 以便它在 GUI 线程上安排可观察对象(这会丢弃当前堆栈)。这是一个草图:

type RxBuilder() =   
  member this.Delay f = 
      let sync = System.Threading.SynchronizationContext.Current 
      let res = Observable.Defer f
      { new IObservable<_> with
          member x.Subscribe(a) = 
            sync.Post( (fun _ -> res.Subscribe(a) |> ignore), null)
            // Note: This is wrong, but we cannot easily get the IDisposable here
            null }
  member this.Combine (xs, ys) = Observable.Concat(xs, ys)
  member this.Yield x = Observable.Return x
  member this.YieldFrom (xs:IObservable<_>) = xs

要正确实现此功能,您必须编写自己的 Concat 方法,这非常复杂。这个想法是:

  • Concat 返回一些特殊类型,例如IConcatenatedObservable
  • 当递归调用该方法时,您将创建一个相互引用的 IConcatenatedObservable
  • Concat 方法将查找该链,并且当存在例如三个对象,它会删除中间的一个(以始终保持链的长度最多为 2)。

这对于 StackOverflow 的答案来说有点太复杂,但对于 Rx 团队来说可能是一个有用的反馈。

关于f# - 如何更改 Rx Builder 实现来修复堆栈溢出异常?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6162288/

相关文章:

algorithm - F# 数学库 - 计算中位数

c# - 使用静态类型语言 (F#) 处理异构数据

F# 整数比较

c# - 结合 boolean Observables

haskell - Haskell 程序员的计算表达式

f# - 使用 FSharp.Configuration 类型提供程序

c# - 使用 RX 组成命令总线

c# - 使用 IObservable (Rx) 作为 MVVM 的 INotifyCollectionChanged 替代品?

haskell - 学术用途之外的延续单子(monad)是否有现实世界的适用性?

f# - 计算表达式 vs 应用仿函数等等