f# - F#中的异步屏障

标签 f# functional-programming synchronization multitasking

我用 F# 编写了一个程序,它异步列出磁盘上的所有目录。异步任务列出给定目录中的所有文件,并创建单独的异步任务(守护进程:我使用 Async.Start 启动它们)以列出子目录。它们都将结果传送到中央 MailboxProcessor。

我的问题是,如何检测到所有守护进程任务都已完成并且不会再有文件到达。基本上,我需要为我的首要任务的(直接和间接)子项的所有任务设置障碍。我在 F# 的异步模型中找不到类似的东西。

我所做的是创建一个单独的 MailboxProcessor,在其中注册每个任务的开始和终止。当事件计数变为零时,我就完成了。但我对这个解决方案并不满意。还有其他建议吗?

最佳答案

您是否尝试过使用 Async.Parallel ?也就是说,而不是 Async.Start每个子目录,只需通过 Async.Parallel 将子目录任务合并到一个异步中.然后你会得到一个(嵌套的)fork-join 任务,你可以 RunSynchronously并等待最终结果。

编辑

这是一些近似代码,它显示了要点,如果不是完整的细节:

open System.IO

let agent = MailboxProcessor.Start(fun mbox ->
    async {
        while true do
            let! msg = mbox.Receive()
            printfn "%s" msg
    })

let rec traverse dir =
    async {
        agent.Post(dir)
        let subDirs = Directory.EnumerateDirectories(dir)
        return! [for d in subDirs do yield traverse d] 
                 |> Async.Parallel |> Async.Ignore 
    }

traverse "d:\\" |> Async.RunSynchronously
// now all will be traversed, 
// though Post-ed messages to agent may still be in flight

编辑 2

这是使用回复的等待版本:
open System.IO

let agent = MailboxProcessor.Start(fun mbox ->
    async {
        while true do
            let! dir, (replyChannel:AsyncReplyChannel<unit>) = mbox.Receive()
            printfn "%s" dir
            replyChannel.Reply()
    })

let rec traverse dir =
    async {
        let r = agent.PostAndAsyncReply(fun replyChannel -> dir, replyChannel)
        let subDirs = Directory.EnumerateDirectories(dir)
        do! [for d in subDirs do yield traverse d] 
                 |> Async.Parallel |> Async.Ignore 
        do! r // wait for Post to finish
    }

traverse "c:\\Projects\\" |> Async.RunSynchronously
// now all will be traversed to completion 

关于f# - F#中的异步屏障,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4608860/

相关文章:

scala - Scala 中 Universal trait 的用途是什么?

Java:作用于同一元素的锁定函数

java - 尝试使用生产者和消费者模式打印斐波那契数列

c# - 从 F# 中的 map 中删除所有内容

c# - F# - "self-filling"类型属性

F# 从元组列表中提取元组

android - 如何将接收图像的线程与url同步

f# - 我尝试在 if 语句中连接有什么问题

scala - 在 Scala 中将 Option[T] 转换为 Option[U]

functional-programming - 迭代连续折叠结果的惯用和功能方法是什么?