我的代码需要触发多个线程并跟踪哪些已完成,哪些仍在运行。我打算使用 waitAny
或 waitAnyCatch
,但在 documentation 中被以下内容抛弃了
If multiple Asyncs complete or have completed, then the value returned corresponds to the first completed Async in the list.
如果情况确实如此,人们如何可靠地跟踪正在运行/退出的线程?
这是我的简化代码:
chan <- newChan
currentThreadsRef <- newIORef []
-- read jobs from a channel, and run them in parallel asyncs/threads,
-- while adding all threads references to currentThreadsRef
async $ do
jobArgs <- readChan chan
jobAsync <- async $ runJob jobArgs
atomicallyModifyIORef' currentThreadsRef $ \x -> (jobAsync:x, ())
-- wait for jobs to be finished, and remove the thread refernece
-- from currentThreadsRef
waitForAllJobs currentJobsRef = do
(readIORef currentJobsRef) >>= \case
[] -> logDebug "All jobs exited"
currentJobs -> do
(exitedJob, jobResult) <- waitAnyCatch currentJobs
atomicallyModifyIORef currentJobsRef $ \x -> (filter (/= exitedjob) x, ())
logDebug $ "Job completed with result=" <> show result
waitForAllJobs currentJobsRef
PS:虽然从我上面的简化代码中可能看不出来,但我不能简单地使用
mapConcurrently
是有原因的。在输入数据上。其实, async-pool
似乎很适合我的用例,但即使这样也有与 waitAny
相同的问题.
最佳答案
这是一个启动 1000 个异步的程序,所有异步都设置为在一秒钟内终止并在循环中等待它们。编译 ghc -O2 -threaded
并使用 +RTS -N
运行,它在大约 1.5 秒内运行,并且没有一个异步被“丢失”:
import Control.Concurrent
import Control.Concurrent.Async
import qualified Data.Set as Set
main :: IO ()
main = do
let n = 1000 :: Int
asyncs0 <- mapM (\i -> async (threadDelay 1000000 >> return i)) [1..n]
let loop :: Set.Set (Async Int) -> IO ()
loop asyncs | null asyncs = return ()
| otherwise = do
(a, _i) <- waitAny (Set.toList asyncs)
loop (Set.delete a asyncs)
loop (Set.fromList asyncs0)
因此,正如评论中提到的,文档指的是提供列表中第一个完成的异步是将“返回”的,但是如果多个异步已完成,则不会“忘记其他异步” ”。您只需要从列表中删除返回的 async 并重新轮询,您最终会得到它们。
因此,使用
waitAny
等待多个异步操作应该不会有任何问题。 .
关于haskell - 可靠地等待多个异步?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59849091/