我正在寻找一种在二维数组上并行映射函数f :: a -> IO(b)
的方法,同时保留合理的内存消耗。
我还希望能够提供数组索引作为函数的参数,即映射g :: Int -> Int -> a -> IO(b)
,例如Data.Vector的imap
或Data.Map的mapWithKey
。
当前尝试(请参阅下文)具有严重的内存消耗,或者在运行时引发错误。
注意,实际上,我感兴趣的函数类型是h :: Int -> Int -> a -> Random b
,其中Random
表示Control.Monad.Random中的一些Random monad;我使用evalRandIO
将其移至IO monad。
尝试的解决方案:
假设我想将foo :: Int -> Int -> a -> IO(b)
函数映射到a
类型的2D元素数组上。 (此处a
和b
是特定类型;没有任何隐式通用量化。)
到目前为止,我已经尝试了以下方法:
import Control.Concurrent.Async(mapConcurrently)
indexedArray :: [[(Int,Int,a)]]
indexedArray = -- ...
mappedArray = mapConcurrently (traverse (\(x,y,a) -> foo x y a)) indexedArray
这种方法的问题是内存消耗超出了图表(例如4GB作为引用)。
正如答案中指出的那样,通过这种方法,我只评估并行的行,而不是所有元素,但这在实践中对我没有多大影响。
import qualified Data.Array.Repa as R
import Data.Array.Repa(Z(..), (:.)(..), U, DIM2)
array :: R.Array U DIM2 a
array = -- ...
mappedArray = R.traverse array id (\i (Z :. x :. y) -> unsafePerformIO $ foo x y (i (Z :. x :. y)))
result = R.computeP mappedArray
请注意,
R.traverse
不是Data.Traversable(traverse)
。由于Repa阵列不支持Data.Traversable(traverse)
,因此我无法以任何方式对IO操作进行排序,因此我必须使用unsafePerformIO
才能使用内置的"traverse"
功能。这种方法具有良好的性能和出色的内存消耗(供引用约50MB)。
但是,有一个问题,因为我始终遇到以下运行时错误:
thread blocked indefinitely in an MVar operation
。3a。 Data.Vector和Control.Parallel
本质上与Repa相同的方法会导致相同的错误
thread blocked indefinitely in an MVar operation
。我再次求助于使用
unsafePerformIO
作为Data.Vector向量没有可遍历的实例。 import qualified Data.Vector as V
import Control.Parallel.Strategies(using)
import Data.Vector.Strategies(parVector)
array :: V.Vector (V.Vector a)
array = -- ...
mappedArray = V.imap (\ y row -> V.imap (\x a -> unsafePerformIO $ foo x y a ) row ) `using` (parVector 1)
与Repa相比,内存消耗和性能稍差一些(仅供引用,约为100MB),但仍可比。
3b。 Data.Vector和Control.Concurrent.Async
如sheyll所建议,但是使用平面向量而不是嵌套向量。
import qualified Data.Vector.Unboxed as V
import qualified Data.Vector.Unboxed.Mutable as M
import Control.Concurrent.Async(forConcurrently_)
mappedFlattenedArray = do
flattenedMArray <- V.unsafeThaw $ -- ...
forConcurrently_ [0..w*h] (\i -> do v <- M.unsafeRead flattenedMArray i
let (y,x) = quotRem i w
v' <- foo x y v
M.unsafeWrite flattenedMArray i v' )
V.unsafeFreeze flattenedMArray
不幸的是,这种方法的内存消耗非常高(〜3GB)。我认为这是因为
forConcurrently_
会创建许多混音吗?我不确定如何避免此问题。根据Alec的建议,使用Data.Array数组的可遍历实例:
import qualified Data.Array.Unboxed as A
import Control.Concurrent.Async(mapConcurrently)
indexedArray :: A.Array (Int,Int) ((Int,Int),a)
indexedArray = -- ...
mappedArray = mapConcurrently (\((x,y),a) -> foo x y a) indexedArray
再一次,即使使用未装箱的阵列,内存消耗也非常高(〜3GB)。该问题可能与方法1和3b中的问题相同,堆积的thunk占用了大量内存。我不确定该如何解决。
Repa的总体性能和内存消耗似乎比其他任何方法都要好,我也很欣赏用于处理二维数组并能够映射使用索引的函数的内置功能。不幸的是,大多数时候我都会遇到上述运行时错误(但并非总是如此!)。
前面我说过,
foo
的返回类型是IO(b)
的唯一原因是由于不确定性。因此,我本以为可以将输出类型更改为某种Random
monad,而不是执行unsafePerformIO
,而只需执行具有给定种子的runRandom
即可。不幸的是,这没有解决问题,因为无论如何我一直收到错误thread blocked indefinitely in an MVar operation
。有什么办法可以挽救方法2(Repa)来避免此错误?还是有其他适用的方法?
我了解,一般而言,IO不能打破并行性,因为不能保证IO操作不会发生冲突,但是至少对于这种用例,我认为应该可以解决。 (请参阅:Why is there no mapM for repa arrays?)
另请参阅以下问题:Parallel mapM on Repa arrays。但是请注意,我事先不知道我的函数
foo
需要多少个随机数。
最佳答案
您的第一种方法可能是您想要的,但没有链接列表。请注意,类型 mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b)
允许您对等于traverse
的任何内容(包括 Traversable
)进行相当于并行Array
的操作(我在这里建议对Array
进行Vector
的使用,只是因为它更适合于多个维度)。
import Control.Concurrent.Async (mapConcurrently)
import Data.Array
indexedArray :: Array (Int,Int) (Int,Int,a)
indexedArray = ...
mappedArray = mapConcurrently (\(x,y,a) -> foo x y a) indexedArray
另外,请注意,您以前使用嵌套列表的方法仅并行化每个子列表的
traverse
-不会并行处理整个事情。
关于arrays - 数组上不确定函数的异步映射,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42309713/