比如说,我想并行折叠
幺半群。我的电脑有8核。我有这个函数可以将列表拆分为大小相等的较小列表(具有有界模偏差):
import Data.List
parallelize :: Int -> [a] -> [[a]]
parallelize 0 _ = []
parallelize n [] = replicate n []
parallelize n xs = let
(us,vs) = splitAt (quot (length xs) n) xs
in us : parallelize (n-1) vs
我制作的并行fold
的第一个版本是:
import Control.Concurrent
import Control.Concurrent.QSemN
import Data.Foldable
import Data.IORef
foldP :: Monoid m => [m] -> IO m
foldP xs = do
result <- newIORef mempty
sem <- newQSemN 0
n <- getNumCapabilities
let yss = parallelize n xs
for_ yss (\ys -> forkIO (modifyIORef result (fold ys <>) >> signalQSemN sem 1))
waitQSemN sem n
readIORef result
但是 IORef
和信号量的使用对我来说似乎很难看。所以我做了另一个版本:
import Data.Traversable
foldP :: Monoid m => [m] -> IO m
foldP xs = do
n <- getNumCapabilities
let yss = parallelize n xs
rs <- for yss (\ys -> runInUnboundThread (return (fold ys)))
return (fold rs)
我使用的测试代码是:
import Data.Monoid
import System.CPUTime
main :: IO ()
main = do
start <- getCPUTime
Product result <- foldP (fmap Product [1 .. 100])
end <- getCPUTime
putStrLn ("Time took: " ++ show (end - start) ++ "ps.")
putStrLn ("Result: " ++ show result)
foldP
的第二个版本优于第一个版本。当我使用 runInBoundThread
而不是 runInUnboundThread
时,它变得更快。
这些性能差异是由什么造成的?
最佳答案
TLDR;使用fold
函数来自 massiv
包,您可能会得到 Haskell 中最有效的解决方案。
首先我想说的是,当人们尝试实现这样的并发模式时,首先忘记的是异常处理。在问题的解决方案中,异常处理不存在,因此它是完全错误的。因此,我建议使用常见并发模式的现有实现。 async
是用于并发的 goto 库,但对于这种用例,它不是最有效的解决方案。
这个特殊的例子可以很容易地用 scheduler
来解决。包,事实上它正是它设计的目的。以下是如何使用它来实现幺半群的折叠:
import Control.Scheduler
import Control.Monad.IO.Unlift
foldP :: (MonadUnliftIO m, Monoid n) => Comp -> [n] -> m n
foldP comp xs = do
rs <-
withScheduler comp $ \scheduler ->
mapM_ (scheduleWork scheduler . pure . fold) (parallelize (numWorkers scheduler) xs)
pure $ fold rs
有关最佳并行化策略的说明,请参阅 Comp
类型。根据我在实践中发现的情况,Par
通常效果最好,因为它将使用使用 forkOn
请注意,parallelize
函数的实现效率低下且危险,最好这样写:
parallelize :: Int -> [a] -> [[a]]
parallelize n' xs' = go 0 id xs'
where
n = max 1 n'
-- at least two elements make sense to get benefit of parallel fold
k = max 2 $ quot (length xs') n
go i acc xs
| null xs = acc []
| i < n =
case splitAt k xs of
(ls, rs) -> go (i + 1) (acc . (ls :)) rs
| otherwise = acc . (xs:) $ []
还有一点建议是,列表总体上远非并行化和效率的理想数据结构。为了在并行计算之前将列表分割成 block ,您必须使用并行化来遍历数据结构,如果您要使用数组,则可以避免这种情况。我得到的是使用数组,正如本答案开头所建议的那样。
关于performance - runInBoundThread 是并行性的最佳工具吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62725053/