haskell - 了解此 Haskell 程序的内存使用情况

标签 haskell memory haskell-pipes

我应该在开头说我是 Haskell 和 Pipes 库的初学者,我想了解是什么导致了这个程序在 test 函数中的高内存使用率.

特别是在 test 中生成 r1 值的折叠中,我看到 MyRecord 值不断累积,直到生成最终结果,除非 deepseq<使用。在我的约 500000 行/约 230 MB 的示例数据集上,内存使用量增长超过 1.5 GB。

产生r2值的折叠在常量内存中运行。

我想了解的是:

1) 什么可能导致在第一个折叠中构建 MyMemory 值,以及为什么使用 deepseq 可以修复它?我非常随意地向它扔东西,直到使用deepseq来实现恒定的内存使用,但我想了解它的工作原理。在不使用deepseq的情况下是否可以实现恒定的内存使用,同时仍然产生相同的 Maybe Int 结果类型?

2).第二次折叠有什么不同导致它没有表现出相同的问题?

我知道,如果我只使用整数而不是元组,我可以使用 Pipes.Prelude 中的内置 sum 函数,但我最终会想要处理包含任何解析的第二个元素错误。

{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Test where

import           Control.Arrow
import           Control.DeepSeq
import           Control.Monad
import           Data.Aeson
import           Data.Function
import           Data.Maybe
import           Data.Monoid
import           Data.Text (Text)

import           Pipes
import qualified Pipes.Aeson as PA (DecodingError(..))
import qualified Pipes.Aeson.Unchecked as PA
import qualified Pipes.ByteString as PB
import qualified Pipes.Group as PG
import qualified Pipes.Parse as PP
import qualified Pipes.Prelude as P

import           System.IO
import           Control.Lens
import qualified Control.Foldl as Fold

data MyRecord = MyRecord
  { myRecordField1 :: !Text
  , myRecordField2 :: !Int
  , myRecordField3 :: !Text
  , myRecordField4 :: !Text
  , myRecordField5 :: !Text
  , myRecordField6 :: !Text
  , myRecordField7 :: !Text
  , myRecordField8 :: !Text
  , myRecordField9 :: !Text
  , myRecordField10 :: !Int
  , myRecordField11 :: !Text
  , myRecordField12 :: !Text
  , myRecordField13 :: !Text
  } deriving (Eq, Show)

instance FromJSON MyRecord where
  parseJSON (Object o) =
    MyRecord <$> o .: "field1" <*> o .: "field2" <*> o .: "field3" <*>
    o .: "field4" <*>
    o .: "field5" <*>
    o .: "filed6" <*>
    o .: "field7" <*>
    o .: "field8" <*>
    o .: "field9" <*>
    (read <$> o .: "field10") <*>
    o .: "field11" <*>
    o .: "field12" <*>
    o .: "field13"
  parseJSON x = fail $ "MyRecord: expected Object, got: " <> show x

instance ToJSON MyRecord where
    toJSON _ = undefined

test :: IO ()
test = do
  withFile "some-file" ReadMode $ \hIn
  {-

      the pipeline is composed as follows:

      1 a producer reading a file with Pipes.ByteString, splitting chunks into lines,
        and parsing the lines as JSON to produce tuples of (Maybe MyRecord, Maybe
        ByteString), the second element being an error if parsing failed

      2 a pipe filtering that tuple on a field of Maybe MyRecord, passing matching
        (Maybe MyRecord, Maybe ByteString) downstream

      3 and a pipe that picks an Int field out of Maybe MyRecord, passing (Maybe Int,
        Maybe ByteString downstream)

      pipeline == 1 >-> 2 >-> 3

      memory profiling indicates the memory build up is due to accumulation of
      MyRecord "objects", and data types comprising their fields (mainly
      Text/ARR_WORDS)

  -}
   -> do
    let pipeline = f1 hIn >-> f2 >-> f3
    -- need to use deepseq to avoid leaking memory
    r1 <-
      P.fold
        (\acc (v, _) -> (+) <$> acc `deepseq` acc <*> pure (fromMaybe 0 v))
        (Just 0)
        id
        (pipeline :: Producer (Maybe Int, Maybe PB.ByteString) IO ())
    print r1
    hSeek hIn AbsoluteSeek 0
    -- this works just fine as is and streams in constant memory
    r2 <-
      P.fold
        (\acc v ->
           case fst v of
             Just x -> acc + x
             Nothing -> acc)
        0
        id
        (pipeline :: Producer (Maybe Int, Maybe PB.ByteString) IO ())
    print r2
    return ()
  return ()

f1
  :: (FromJSON a, MonadIO m)
  => Handle -> Producer (Maybe a, Maybe PB.ByteString) m ()
f1 hIn = PB.fromHandle hIn & asLines & resumingParser PA.decode

f2
  :: Pipe (Maybe MyRecord, Maybe PB.ByteString) (Maybe MyRecord, Maybe PB.ByteString) IO r
f2 = filterRecords (("some value" ==) . myRecordField5)

f3 :: Pipe (Maybe MyRecord, d) (Maybe Int, d) IO r
f3 = P.map (first (fmap myRecordField10))

filterRecords
  :: Monad m
  => (MyRecord -> Bool)
  -> Pipe (Maybe MyRecord, Maybe PB.ByteString) (Maybe MyRecord, Maybe PB.ByteString) m r
filterRecords predicate =
  for cat $ \(l, e) ->
    when (isNothing l || (predicate <$> l) == Just True) $ yield (l, e)

asLines
  :: Monad m
  => Producer PB.ByteString m x -> Producer PB.ByteString m x
asLines p = Fold.purely PG.folds Fold.mconcat (view PB.lines p)

parseRecords
  :: (Monad m, FromJSON a, ToJSON a)
  => Producer PB.ByteString m r
  -> Producer a m (Either (PA.DecodingError, Producer PB.ByteString m r) r)
parseRecords = view PA.decoded

resumingParser
  :: Monad m
  => PP.StateT (Producer a m r) m (Maybe (Either e b))
  -> Producer a m r
  -> Producer (Maybe b, Maybe a) m ()
resumingParser parser p = do
  (x, p') <- lift $ PP.runStateT parser p
  case x of
    Nothing -> return ()
    Just (Left _) -> do
      (x', p'') <- lift $ PP.runStateT PP.draw p'
      yield (Nothing, x')
      resumingParser parser p''
    Just (Right b) -> do
      yield (Just b, Nothing)
      resumingParser parser p'

最佳答案

docs for Pipes.foldl 中所述,折叠严格。然而, 严格性是implemented with $!这只会强制评估 WHNF - 弱头正常形式。 WHNF 足以充分评估一个简单的 像 Int 一样的类型,但它不够强大,无法完全评估更多 复杂类型,例如 Maybe Int

一些例子:

main1 = do
  let a = 3 + undefined
      b = seq a 10
  print b                -- error: Exception: Prelude.undefined

main2 = do
  let a = Just (3 + undefined)
      b = seq a 10
  print b                -- no exception

在第一种情况下,变量acc是一个大thunk的Just——所有元素的总和。在每次迭代中变量 accJust aJust (a+b)Just (a+b+c) 等等。 不在折叠期间执行 - 它仅在 最后。大量的内存使用量来自于存储这个不断增长的总和 内存中。

在第二种情况下,每次迭代总和都会减少 $! 为一个简单的 Int。

除了使用deepseq之外,您还可以使用force:

force x = x `deepseq` x

mentioned in the deepseq docs ,结合 ViewPatterns 你 可以创建一个完全评估函数参数的模式:

{-# LANGUAGE ViewPatterns #-}

...
P.fold
  (\(force -> !acc) (v,_) -> (+) <$> acc <*> pure (fromMaybe 0 v))
  (Just 0)
  ...

关于haskell - 了解此 Haskell 程序的内存使用情况,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39354139/

相关文章:

haskell - 弱头范式和评估顺序

swift - 将 [UInt8] 数组转换为 xinpgen 结构

haskell - 根据时间限制管道?

haskell - 将两个消费者连接成一个返回多个值的消费者?

haskell - 计算类型 Haskell

haskell - 在 CoffeeScript 中处理无限列表

linux - 内存分段边界检查是如何完成的?

c++ - 正确处理内存限制?

haskell - 为什么管道定义内部功能

json - 如何在 Yesod 中编写一个接收文件/图像上传的 JSON 端点?