haskell Pipes - 如何在字节串管道上重复执行 takeWhile 操作?

标签 haskell streaming haskell-pipes

我想做的是使用 takeWhile 按某个字符分割字节串。

import qualified Data.ByteString.Internal as BS (c2w, w2c)
import Pipes
import Pipes.ByteString as PB
import Pipes.GZip
import Pipes.Prelude as PP
import System.IO

newline = BS.c2w '\n'

splitter = PB.takeWhile (\myWord -> myWord /= newline)

myPipe fileHandle = PP.toListM $ decompress fileProducer >-> splitter
  where
    fileProducer = PB.fromHandle fileHandle       

run = do
  dat <- withFile "somefile.blob" ReadMode myPipe
  pure dat

这让我得到了第一行,但我真正想要的是有效地将每个 block 一次生成一个换行符。我该怎么做?

最佳答案

@Michael 的回答很好。我只是想说明一些 这里正在发生的使用模式。

( .lhs 位于 http://lpaste.net/165352 )

首先是一些导入:

 {-# LANGUAGE OverloadedStrings, NoMonomorphismRestriction #-}

 import Pipes
 import qualified Pipes.Prelude as PP
 import qualified Pipes.Group as PG
 import qualified Pipes.ByteString as PB
 import qualified Pipes.GZip as GZip
 import qualified Data.ByteString as BS
 import Lens.Family (view, over)
 import Control.Monad
 import System.IO

如果你查看 Pipes.ByteString 和 Pipes.GZip 中的函数 您会看到它们都属于以下类型模式:

  1. 制作人... -> FreeT(制作人...)...
  2. FreeT(制作人...)... -> 制作人...
  3. Lens'(制作人...)(FreeT(制作人...)...)
  4. 制片人 ... -> 制片人 ...

每个类别的功能示例:

  1. PB.words
  2. PG.concats
  3. PB.linesPB.chunksOfPB.splits、...
  4. GZip.compressGZip.decompress

以下是如何使用 PB.words 将输入流拆分为单词:

 prod = yield "this is\na test\nof the pipes\nprocessing\nsystem"

 t1 = runEffect $ (PG.concats . PB.words) prod >-> PP.print

使用类型 3 的函数——例如PB.lines,只需使用view Lens' 获取类型 1 的函数,然后与 PG.concats 组合:

 t2a = runEffect $ (PG.concats . view PB.lines) prod >-> PP.print

 t2b h = (PG.concats . view PB.lines) (PB.fromHandle h) >-> PP.print

 run2 = withFile "input" ReadMode (runEffect . t2b)

对于 Producer -> Producer 函数,只需使用普通函数应用程序即可:

 t3 h = GZip.decompress (PB.fromHandle h) >-> PP.print

 run3 = withFile "input.gz" ReadMode (runEffect . t3)

 t4 h = GZip.decompress (PB.fromHandle h) >-> PP.map BS.length >-> PP.print

 run4 = withFile "big.gz" ReadMode (runEffect . t4)

为了先解压再按行分割,我们嵌套了函数 应用:

 t5 h = (PG.concats . view PB.lines) ( GZip.decompress (PB.fromHandle h) )
          >-> PP.map BS.length >-> PP.print

 run5 = withFile "input.gz" ReadMode (runEffect . t5)

关于haskell Pipes - 如何在字节串管道上重复执行 takeWhile 操作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37632027/

相关文章:

javascript - 不要在 Android 上加载时间流音频

haskell - 使用 Control.Concurrent.MonadIO 进行管道和 fork

haskell - 更好的 `:browse` 命令 (GHCi)

c - GHC/FFI : calling haskell module which imports haskell libraries from C

haskell - 使用 Cabal 和 GHC 构建库的差异

wcf - 从 WCF 服务向客户端发送大数据的最佳方式是什么?

node.js - 是否有适合 node.js 的类似 radio 的音频流解决方案?

haskell-pipes - 使用 haskell 管道字节串逐行迭代文件

Haskell Control.Proxy.TCP 生产者

haskell - 并发访问持久化数据库的规则是什么