haskell - 可靠地等待多个异步?

标签 haskell asynchronous concurrency

我的代码需要触发多个线程并跟踪哪些已完成,哪些仍在运行。我打算使用 waitAnywaitAnyCatch ,但在 documentation 中被以下内容抛弃了

If multiple Asyncs complete or have completed, then the value returned corresponds to the first completed Async in the list.



如果情况确实如此,人们如何可靠地跟踪正在运行/退出的线程?

这是我的简化代码:
chan <- newChan
currentThreadsRef <- newIORef []

-- read jobs from a channel, and run them in parallel asyncs/threads,
-- while adding all threads references to currentThreadsRef
async $ do
  jobArgs <- readChan chan
  jobAsync <- async $ runJob jobArgs
  atomicallyModifyIORef' currentThreadsRef $ \x -> (jobAsync:x, ())

-- wait for jobs to be finished, and remove the thread refernece
-- from currentThreadsRef 
waitForAllJobs currentJobsRef = do
  (readIORef currentJobsRef) >>= \case
    [] -> logDebug "All jobs exited"
    currentJobs -> do
      (exitedJob, jobResult) <- waitAnyCatch currentJobs
      atomicallyModifyIORef currentJobsRef $ \x -> (filter (/= exitedjob) x, ())
      logDebug $ "Job completed with result=" <> show result
      waitForAllJobs currentJobsRef

PS:虽然从我上面的简化代码中可能看不出来,但我不能简单地使用 mapConcurrently 是有原因的。在输入数据上。其实, async-pool 似乎很适合我的用例,但即使这样也有与 waitAny 相同的问题.

最佳答案

这是一个启动 1000 个异步的程序,所有异步都设置为在一秒钟内终止并在循环中等待它们。编译 ghc -O2 -threaded并使用 +RTS -N 运行,它在大约 1.5 秒内运行,并且没有一个异步被“丢失”:

import Control.Concurrent
import Control.Concurrent.Async
import qualified Data.Set as Set

main :: IO ()
main = do
  let n = 1000 :: Int
  asyncs0 <- mapM (\i -> async (threadDelay 1000000 >> return i)) [1..n]
  let loop :: Set.Set (Async Int) -> IO ()
      loop asyncs | null asyncs = return ()
                  | otherwise = do
                      (a, _i) <- waitAny (Set.toList asyncs)
                      loop (Set.delete a asyncs)
  loop (Set.fromList asyncs0)

因此,正如评论中提到的,文档指的是提供列表中第一个完成的异步是将“返回”的,但是如果多个异步已完成,则不会“忘记其他异步” ”。您只需要从列表中删除返回的 async 并重新轮询,您最终会得到它们。

因此,使用 waitAny 等待多个异步操作应该不会有任何问题。 .

关于haskell - 可靠地等待多个异步?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59849091/

相关文章:

go - 为什么不能通过同时计算 slice 的不同部分来加速程序?

java - java中的http负载生成器

python - 从 Haskell 到函数式 Python

c# - 调用 .Show on new Window 时 UI 卡住

ios - 使用 NSURLConnection 异步下载 UITableView 的图像

javascript - 测试复杂的异步 Redux 操作

c# - 如何增加 .NET Core 中的传出 HTTP 请求配额?

haskell - 从字典和维度生成所有可能组合的功能性尾递归方式

haskell - cabal install MissingPy 找不到 Data.HashTable

haskell - 关于 scotty Haskell Web 框架的简单问题