haskell - 提高基于线路的导管性能的方法

标签 haskell io conduit

我使用 haskell 进行基于行的数据处理,即可以应用的任务 sed , awk和类似的工具。作为一个简单的例子,让我们在前面加上 000从标准输入到每一行。

我有三种替代方法来完成这项任务:

  • 带有惰性的惰性 IO ByteString小号
  • 基于线路的导管。
  • 纯严格的基于 block 的导管ByteString内加工。
  • example.hs :
    {-# LANGUAGE NoImplicitPrelude #-}
    {-# LANGUAGE OverloadedStrings #-}
    {-# LANGUAGE FlexibleContexts #-}
    
    import ClassyPrelude.Conduit
    import qualified Data.ByteString.Char8 as B8
    import qualified Data.ByteString.Lazy.Char8 as BL8
    import qualified Data.Conduit.Binary as CB
    
    main = do
      [arg] <- getArgs
      case arg of
    
        "lazy" -> BL8.getContents >>= BL8.putStr . BL8.unlines . map ("000" ++) . BL8.lines
    
        "lines" -> runConduitRes $ stdinC .| CB.lines .|
          mapC ("000" ++) .| mapC (`snoc` 10) .| stdoutC
    
        "chunks" -> runConduitRes $ stdinC .| lineChunksC .|
          mapC (B8.unlines . (map ("000" ++)) . B8.lines) .| stdoutC
    
    
    lineChunksC :: Monad m => Conduit ByteString m ByteString
    lineChunksC = await >>= maybe (return ()) go
      where
      go acc = if
        | Just (_, 10) <- unsnoc acc -> yield acc >> lineChunksC
        | otherwise -> await >>= maybe (yield acc) (go' . breakAfterEOL)
        where
        go' (this, next) = let acc' = acc ++ this in if null next then go acc' else yield acc' >> go next
    
    breakAfterEOL :: ByteString -> (ByteString, ByteString)
    breakAfterEOL = uncurry (\x -> maybe (x, "") (first (snoc x)) . uncons) . break (== 10)
    
    $ stack ghc --package={classy-prelude-conduit,conduit-extra} -- -O2 example.hs -o example
    $ for cmd in lazy lines chunks; do echo $cmd; time -p seq 10000000 | ./example $cmd > /dev/null; echo; done
    lazy
    real 2.99
    user 3.06
    sys 0.07
    
    lines
    real 3.30
    user 3.36
    sys 0.06
    
    chunks
    real 1.83
    user 1.95
    sys 0.06
    

    (The results are consistent across multiple runs, and also hold for lines with several numbers).

    So chunks is 1.6x faster than lines which is a bit faster than lazy. This means that conduits can be faster than plain bytestrings, but the overhead of conduit pipes is too heavy when you split chunks into short lines.

    What I don't like about chunks approach is that it mixes both conduit and pure worlds, and it makes it harder to use it for more complex tasks.

    The question is, did I miss a simple and elegant solution which would allow me to write efficient code in same fashion as with lines approach?

    EDIT1: Per @Michael's suggestion I joined two mapC into one mapC (("000" ++). (snoc10)) in lines solution, to make number of pipes (.|) same between lines and chunks. This made it perform a bit better (down from 3.3s to 2.8s), but still significantly slower than chunks.

    Also I tried older Conduit.Binary.lines which Michael suggested in comments, and it also improves performance a bit, by ~0.1s.

    EDIT2: Fixed lineChunksC so it works with very small chunks, e.g.

    > runConduitPure $ yield ("\nr\n\n"::ByteString) .| concatC .| mapC singleton .| lineChunksC .| sinkList 
    ["\n","r\n","\n"]
    

    最佳答案

    我的猜测是,对于“线条”,mapC ("000" ++) .| mapC (`snoc` 10)部分正在做很多工作。

    连接几个严格的ByteStrings进入另一个严格的ByteString太贵了。将它们连接成一个懒惰的 ByteString往往更有效率。

    为避免此成本,您可以将每个部分单独生成为严格的 ByteString 下游。 (但请注意,我们不再谈论“线条”)。

    或者,将每个转换后的行作为惰性 ByteString下游。

    The question is, did I miss a simple and elegant solution which would allow me to write efficient code in same fashion as with lines approach?



    一些流媒体库有一个有趣的特性:您可以在流中分隔行并对其进行操作,而无需在任何时候将整行具体化到内存中。

    这里我使用 streamingstreaming-bytestring包,因为我更熟悉它们。

    在模块 Data.ByteString.Streaming.Char8 对于流式字节串,我们有 lines 功能:
    lines :: Monad m => ByteString m r -> Stream (ByteString m) m r
    

    lines turns a ByteString into a connected stream of ByteStrings at divide at newline characters. The resulting strings do not contain newlines. This is the genuinely streaming lines which only breaks chunks, and thus never increases the use of memory.



    它的要点是ByteString m r已经是流媒体类型了!所以这个版本的lines将流转换为“流的流”。而我们只能通过耗尽“当前流”(当前行)到达“下一个流”(下一行)。

    您的“行”示例可以写成:
    {-# language OverloadedStrings #-}
    module Main where
    
    import Control.Applicative ((*>))
    import Streaming
    import qualified Streaming.Prelude as S
    import qualified Data.ByteString.Streaming.Char8 as Q
    
    main :: IO ()
    main = Q.stdout
         . Q.unlines
         . S.maps (\line -> "000" *> line)
         . Q.lines
         $ Q.stdin
    

    关于haskell - 提高基于线路的导管性能的方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40319710/

    相关文章:

    在 Haskell 中定义为 Maybe 的列表?为什么不?

    exception - 与 aeson/attoparsec 进行管道,一旦源没有更多数据,如何无异常地干净退出

    haskell - 这个简单的函数叫什么?

    Haskell Stack Ghci 测试套件

    C++ 以二进制模式读取文件。文件结束的问题

    haskell - 使用 haskell 删除 stdin 中的项目

    java - HTML 不将数据发送到 Java 文件

    haskell - 如何在 Haskell 中实现 `cat` ?

    haskell - 如何正确关闭与网络管道的网络连接?

    haskell - 如何使用 ((.).(.)) 中缀?