async
package 的文档将 withAsync
函数描述为:
Spawn an asynchronous action in a separate thread, and pass its Async handle to the supplied function. When the function returns or throws an exception, uninterruptibleCancel is called on the Async. This is a useful variant of async that ensures an Async is never left running unintentionally.
在过去的两个小时里我一直盯着它,但一直无法弄清楚如何启动一个监视器线程,它会产生多个工作线程,例如:
- 如果监视器线程死亡,所有工作线程都应该被杀死,
- 但是,如果任何工作线程死亡,其他工作线程都不应受到影响。应通知监视器,并且它应该能够重新启动工作线程.
最佳答案
看来我们需要两个函数:一个用于启动所有异步任务,另一个用于监视它们并在它们死亡时重新启动它们。
第一个可以这样写:
withAsyncMany :: [IO t] -> ([Async t] -> IO b) -> IO b
withAsyncMany [] f = f []
withAsyncMany (t:ts) f = withAsync t $ \a -> withAsyncMany ts (f . (a:))
如果我们使用 managed包,我们也可以这样写:
import Control.Monad.Managed (with,managed)
withAsyncMany' :: [IO t] -> ([Async t] -> IO b) -> IO b
withAsyncMany' = with . traverse (\t -> managed (withAsync t))
重启函数将循环异步列表,轮询其状态并在失败时更新它们:
{-# language NumDecimals #-}
import Control.Concurrent (threadDelay)
resurrect :: IO t -> [Async t] -> IO ()
resurrect restartAction = go []
where
go ts [] = do
threadDelay 1e6 -- wait a little before the next round of polling
go [] (reverse ts)
go past (a:pending) = do
status <- poll a -- has the task died, or finished?
case status of
Nothing -> go (a:past) pending
Just _ -> withAsync restartAction $ \a' -> go (a':past) pending
但是,我担心许多嵌套的 withAsyncs
可能会导致某种类型的资源泄漏(因为必须为每个 withAsync
安装某种异常处理程序来通知子线程以防父线程死亡)。
因此,在这种情况下,最好使用普通的 async
生成工作线程,将 Async
集合存储到某种可变引用中,然后安装单个监视器线程中的异常处理程序,它将遍历终止每个任务的容器。
关于multithreading - 如何将父异步与多个子异步链接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46776954/