asynchronous - 在 F# 中混合 IObservable 和 Async<'a>

标签 asynchronous f# system.reactive

我有一个库提供的IObservable,它监听来自外部服务的事件:

let startObservable () : IObservable<'a> = failwith "Given"

对于每个接收到的事件,我想执行一个返回 Async 的操作:

let action (item: 'a) : Async<unit> = failwith "Given"

我正在尝试实现一个处理器

let processor () : Async<unit> =
    startObservable()
    |> Observable.mapAsync action
    |> Async.AwaitObservable

我已经编写了 mapAsyncAwaitObservable:理想情况下它们将由某个库提供,但我目前还没有找到。

额外要求:

  • 应按顺序执行操作,以便在处理前一个事件时缓冲后续事件。

  • 如果一个操作抛出错误,我希望我的处理器完成。否则,它永远不会完成。

  • 应遵守通过 Async.Start 传递的取消 token 。

关于我应该使用的库有什么提示吗?

最佳答案

由于您要将基于推的模型 (IObservable<>) 转换为基于拉的模型 (Async<>),因此您需要排队以缓冲来自 observable 的数据。如果队列有大小限制——说实话。应该是为了使整个管道安全,不会溢出内存 - 然后还需要缓冲区溢出策略。

  1. 一种方法是实现 MailboxProcessor<>和自定义可观察对象,它将向其发布数据。由于 MP 是原生 F# actor 实现,它能够使用队列进行有序处理以缓冲峰值。
  2. 另一种选择是使用 FSharp.Control.AsyncSeq (特别是 AsyncSeq.ofObservableBuffered 函数),它将 observable 转换为基于拉取的异步可枚举 - 在其下方从第一点开始使用邮箱处理器:

    startObservable()
    |> AsyncSeq.ofObservableBuffered
    |> AsyncSeq.iterAsync action
    

关于asynchronous - 在 F# 中混合 IObservable 和 Async<'a>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52292043/

相关文章:

f# - 同时使用构建器类的相同实例是否会引起任何副作用?

.net - 如何在 F# 中声明可通过 WebJob 的 JobHost.CallAsync 调用的函数?

c# - 在 F# 中使用绑定(bind)接口(interface)

c# - 从 Event 创建 Observable 时出错

system.reactive - 什么是热观测值和冷观测值?

c# - 可观察的取消标记

java - 如何正确检测 servlet 规范 3 中的客户端断开连接?

c# - 如何使用 call/cc 实现 c# 5.0 中的新异步功能?

javascript - 在 AngularJS 中异步调用 hprose.httpclient

c# - 为什么需要 EndExecuteNonQuery()?