f# 邮箱处理器 - 无需等待发送即可回复

标签 f# agent mailboxprocessor

我正在使用代理 (MailboxProcessor) 在需要响应的地方进行一些有状态的处理。

  • 来电者使用 MailboxProcessor.PostAndAsyncReply 发布消息
  • 在代理中,响应以 AsyncReplyChannel.Reply 给出。

  • 但是,我通过查看 f# 源代码发现,在传递响应之前,代理不会处理下一条消息。总的来说,这是一件好事。但就我而言,代理继续处理消息比等待响应传递更可取。

    做这样的事情来提供响应有问题吗? (或者有更好的选择吗?)
    async { replyChannel.Reply response } |> Async.Start
    

    我意识到这种方法并不能保证响应会按顺序传递。我没问题。

    引用示例
    // agent code
    let doWork data =
        async { ... ; return response }
    
    let rec loop ( inbox : MailboxProcessor<_> ) =
        async {
            let! msg = inbox.Receive()
            match msg with
            | None ->
                return ()
    
            | Some ( data, replyChannel ) ->
                let! response = doWork data
                replyChannel.Reply response (* waits for delivery, vs below *)
                // async { replyChannel.Reply response } |> Async.Start
                return! loop inbox
        }
    
    let agent =
        MailboxProcessor.Start(loop)
    
    // caller code
    async {
        let! response =
            agent.PostAndAsyncReply(fun replyChannel -> Some (data, replyChannel))
        ...
    }
    

    最佳答案

    FSharp.Control.AsyncSeq在邮箱处理器上放置一张更友好的面孔。异步序列更容易遵循,但是默认实现映射并行具有与所述相同的问题,等待序列中的前一个元素被映射以保留顺序。

    所以我创建了一个新函数 taht 只是原始的 AsyncSeq.mapAsyncParallel,经过修改使其不再是真正的映射,因为它是无序的,但它确实映射了所有内容,并且懒惰的 seq 会随着工作的完成而进展。

    Full Source for AsyncSeq.mapAsyncParallelUnordered

    let mapAsyncParallelUnordered (f:'a -> Async<'b>) (s:AsyncSeq<'a>) : AsyncSeq<'b> = asyncSeq {
      use mb = MailboxProcessor.Start (fun _ -> async.Return())
      let! err =
        s 
        |> AsyncSeq.iterAsyncParallel (fun a -> async {
          let! b = f a
          mb.Post (Some b) })
        |> Async.map (fun _ -> mb.Post None)
        |> Async.StartChildAsTask
      yield! 
        AsyncSeq.replicateUntilNoneAsync (Task.chooseTask (err |> Task.taskFault) (async.Delay mb.Receive))
      }
    

    下面是我如何在使用 SSLlabs 免费且速度非常慢的 api 的工具中使用它的示例,该 api 很容易过载。 parallelProcessHost返回一个懒惰的 AsyncSeq这是由 webapi 请求生成的,所以 AsyncSeq.mapAsyncParallelUnordered AsyncSeq.toListAsync实际运行请求并允许控制台在传入时打印结果,而与发送的顺序无关。

    Full Source
    let! es = 
        hosts
        |> Seq.indexed
        |> AsyncSeq.ofSeq
        |> AsyncSeq.map parallelProcessHost
        |> AsyncSeq.mapAsyncParallelUnordered AsyncSeq.toListAsync
        |> AsyncSeq.indexed
        |> AsyncSeq.map (fun (i, tail) -> (consoleN "-- %d of %i --- %O --" (i+1L) totalHosts (DateTime.UtcNow - startTime)) :: tail )
        |> AsyncSeq.collect AsyncSeq.ofSeq
        |> AsyncSeq.map stdoutOrStatus //Write out to console
        |> AsyncSeq.fold (|||) ErrorStatus.Okay
    

    关于f# 邮箱处理器 - 无需等待发送即可回复,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42118774/

    相关文章:

    methods - F# - 调用方法并分配给构造函数中的属性

    java - 如何从普通(非代理)java 类执行 JADE 代理?

    mercurial - 无法在路径 'hg' 找到 mercurial 可执行文件

    algorithm - 捕食者模拟

    events - F# 使用代理将历史事件与两个代理之间的模拟交互交错

    f# - Xamarin Android : F# icon not being picked up by Resources

    optimization - F# "for loop"优化

    f# - 在 F# 中没有值的成员

    asynchronous - 为什么在异步计算表达式中使用 "use"绑定(bind)时资源的处理会延迟?

    asynchronous - FSharp 中多状态代理编程的其他示例吗?