我有一个库提供的IObservable
,它监听来自外部服务的事件:
let startObservable () : IObservable<'a> = failwith "Given"
对于每个接收到的事件,我想执行一个返回 Async
的操作:
let action (item: 'a) : Async<unit> = failwith "Given"
我正在尝试实现一个处理器
let processor () : Async<unit> =
startObservable()
|> Observable.mapAsync action
|> Async.AwaitObservable
我已经编写了 mapAsync
和 AwaitObservable
:理想情况下它们将由某个库提供,但我目前还没有找到。
额外要求:
应按顺序执行操作,以便在处理前一个事件时缓冲后续事件。
如果一个操作抛出错误,我希望我的处理器完成。否则,它永远不会完成。
应遵守通过
Async.Start
传递的取消 token 。
关于我应该使用的库有什么提示吗?
最佳答案
由于您要将基于推的模型 (IObservable<>
) 转换为基于拉的模型 (Async<>
),因此您需要排队以缓冲来自 observable 的数据。如果队列有大小限制——说实话。应该是为了使整个管道安全,不会溢出内存 - 然后还需要缓冲区溢出策略。
- 一种方法是实现
MailboxProcessor<>
和自定义可观察对象,它将向其发布数据。由于 MP 是原生 F# actor 实现,它能够使用队列进行有序处理以缓冲峰值。 另一种选择是使用
FSharp.Control.AsyncSeq
(特别是 AsyncSeq.ofObservableBuffered 函数),它将 observable 转换为基于拉取的异步可枚举 - 在其下方从第一点开始使用邮箱处理器:startObservable() |> AsyncSeq.ofObservableBuffered |> AsyncSeq.iterAsync action
关于asynchronous - 在 F# 中混合 IObservable 和 Async<'a>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52292043/