当 Haskell 中的异步 a
列表可用时,您将如何收集它们的结果?这个想法是一旦异步任务的结果可用就开始处理它们。
我能想到的最好的方法是以下函数:
collect :: [Async a] -> IO [a]
collect [] = return []
collect asyncs = do
(a, r) <- waitAny asyncs
rs <- collect (filter (/= a) asyncs)
return (r:rs)
但是,此函数没有表现出所需的行为,因为正如下面的评论所指出的,它直到所有异步任务完成后才返回。此外,由于我在每个递归步骤中过滤列表,因此 collect
的运行时间为 O(n^2)
。这可以通过使用更有效的结构来改进(并且可能对列表中 Async
值的位置进行索引)。
也许有库函数可以处理这个问题,但我在 Control.Concurrent.Async
模块中找不到它们,我想知道为什么。
编辑:在更仔细地思考问题之后,我想知道这样的函数是否是一个好主意。我可以在异步任务上使用 fmap
。也许在别无选择的情况下等待结果是更好的做法。
最佳答案
正如我在 my other answer 中提到的,当异步列表可用时,最好使用流处理库来将结果从异步列表中流出来。这是使用 pipes
的示例.
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async
import Control.Concurrent.STM
import Data.Functor (($>))
import Pipes
import Pipes.Concurrent -- from the pipes-concurrency package
import qualified Pipes.Prelude as P
asCompleted :: MonadIO m => [Async a] -> Producer a m ()
asCompleted asyncs = do
(o, i, seal) <- liftIO $ spawn' unbounded
liftIO $ forkIO $ do
forConcurrently asyncs (\async -> atomically $ waitSTM async >>= send o)
atomically seal
fromInput i
main = do
actions <- traverse async [threadDelay 2000000 $> "bar", threadDelay 1000000 $> "foo"]
runEffect $ asCompleted actions >-> P.print
-- after one second, prints "foo", then "bar" a second later
使用pipes-concurrency
,我们spawn'
一个Output
-Input
配对并立即将输入
转换为 Producer
使用fromInput
。异步地,我们send
元素可用时。当所有Async
完成后,我们密封
收件箱以关闭Producer
。
关于multithreading - 在异步结果可用时收集它们,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38813012/