我正在尝试开发一个 Rx Builder,以便在 F# 计算表达式语法中使用响应式扩展。我该如何修复它,以免堆栈崩溃?就像下面的 Seq 例子一样。 是否有计划提供 RxBuilder 的实现作为响应式扩展的一部分或作为 .NET Framework future 版本的一部分?
open System
open System.Linq
open System.Reactive.Linq
type rxBuilder() =
member this.Delay f = Observable.Defer f
member this.Combine (xs: IObservable<_>, ys : IObservable<_>) =
Observable.merge xs ys
member this.Yield x = Observable.Return x
member this.YieldFrom (xs:IObservable<_>) = xs
let rx = rxBuilder()
let rec f x = seq { yield x
yield! f (x + 1) }
let rec g x = rx { yield x
yield! g (x + 1) }
//do f 5 |> Seq.iter (printfn "%A")
do g 5 |> Observable.subscribe (printfn "%A") |> ignore
do System.Console.ReadLine() |> ignore
最佳答案
一个简短的答案是,Rx 框架不支持使用这样的递归模式生成可观察量,因此它不容易完成。用于 F# 序列的Combine
操作需要一些可观察量不提供的特殊处理。 Rx 框架可能希望您使用 Observable.Generate
生成可观察量,然后使用 LINQ 查询/F# 计算生成器来处理它们。
无论如何,这里有一些想法 -
首先,您需要将 Observable.merge
替换为 Observable.Concat
。第一个并行运行两个可观察量,而第二个首先从第一个可观察量产生所有值,然后从第二个可观察量产生值。进行此更改后,该代码片段在堆栈溢出之前至少会打印约 800 个数字。
堆栈溢出的原因是 Concat
创建一个调用 Concat
的可观察量来创建另一个调用 Concat
的可观察量,等等。一种方法解决这个问题的方法是添加一些同步。如果您使用的是 Windows 窗体,则可以修改 Delay
以便它在 GUI 线程上安排可观察对象(这会丢弃当前堆栈)。这是一个草图:
type RxBuilder() =
member this.Delay f =
let sync = System.Threading.SynchronizationContext.Current
let res = Observable.Defer f
{ new IObservable<_> with
member x.Subscribe(a) =
sync.Post( (fun _ -> res.Subscribe(a) |> ignore), null)
// Note: This is wrong, but we cannot easily get the IDisposable here
null }
member this.Combine (xs, ys) = Observable.Concat(xs, ys)
member this.Yield x = Observable.Return x
member this.YieldFrom (xs:IObservable<_>) = xs
要正确实现此功能,您必须编写自己的 Concat
方法,这非常复杂。这个想法是:
- Concat 返回一些特殊类型,例如
IConcatenatedObservable
- 当递归调用该方法时,您将创建一个相互引用的
IConcatenatedObservable
链 Concat
方法将查找该链,并且当存在例如三个对象,它会删除中间的一个(以始终保持链的长度最多为 2)。
这对于 StackOverflow 的答案来说有点太复杂,但对于 Rx 团队来说可能是一个有用的反馈。
关于f# - 如何更改 Rx Builder 实现来修复堆栈溢出异常?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6162288/