multithreading - 在异步结果可用时收集它们

标签 multithreading haskell asynchronous

当 Haskell 中的异步 a 列表可用时,您将如何收集它们的结果?这个想法是一旦异步任务的结果可用就开始处理它们。

我能想到的最好的方法是以下函数:

collect :: [Async a] -> IO [a]
collect [] = return []
collect asyncs = do
  (a, r) <- waitAny asyncs
  rs <- collect (filter (/= a) asyncs)
  return (r:rs)

但是,此函数没有表现出所需的行为,因为正如下面的评论所指出的,它直到所有异步任务完成后才返回。此外,由于我在每个递归步骤中过滤列表,因此 collect 的运行时间为 O(n^2)。这可以通过使用更有效的结构来改进(并且可能对列表中 Async 值的位置进行索引)。

也许有库函数可以处理这个问题,但我在 Control.Concurrent.Async 模块中找不到它们,我想知道为什么。

编辑:在更仔细地思考问题之后,我想知道这样的函数是否是一个好主意。我可以在异步任务上使用 fmap 。也许在别无选择的情况下等待结果是更好的做法。

最佳答案

正如我在 my other answer 中提到的,当异步列表可用时,最好使用流处理库来将结果从异步列表中流出来。这是使用 pipes 的示例.

import Control.Concurrent (threadDelay)
import Control.Concurrent.Async
import Control.Concurrent.STM
import Data.Functor (($>))
import Pipes
import Pipes.Concurrent  -- from the pipes-concurrency package
import qualified Pipes.Prelude as P


asCompleted :: MonadIO m => [Async a] -> Producer a m ()
asCompleted asyncs = do
    (o, i, seal) <- liftIO $ spawn' unbounded
    liftIO $ forkIO $ do
        forConcurrently asyncs (\async -> atomically $ waitSTM async >>= send o)
        atomically seal
    fromInput i

main = do
    actions <- traverse async [threadDelay 2000000 $> "bar", threadDelay 1000000 $> "foo"]
    runEffect $ asCompleted actions >-> P.print
-- after one second, prints "foo", then "bar" a second later

使用pipes-concurrency ,我们spawn'一个Output -Input配对并立即将输入转换为 Producer使用fromInput 。异步地,我们send元素可用时。当所有Async完成后,我们密封收件箱以关闭Producer

关于multithreading - 在异步结果可用时收集它们,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38813012/

相关文章:

java - 动态列表从未加载 - 使用 parse.com

multithreading - CDI 多线程

java - 使用jboss5线程池启动线程

c++ - 需要有关多线程合并排序的建议

haskell - 如何让 GHC 相信递归类型的类型相等性

haskell - Haskell 中的二叉搜索树实现

ios - NSOperation 用户信息字典

pointers - 对相同数据和内存分配的引用

swift - UITableViewController 和预取

c++ - boost ASIO async_read_until 不编译 ASIO ssl 客户端示例