haskell - 使用组合器将管道重新分块成更大的 block

标签 haskell chunking conduit

我正在尝试构建一个 Conduit接收作为输入 ByteString s(每 block 大小约为 1kb)并作为输出连接 ByteString s 的 512kb block 。

这似乎应该很简单,但我遇到了很多麻烦,我尝试使用的大多数策略都只成功地将 block 分成更小的 block ,我没有成功连接更大的 block 。

我开始尝试 isolate ,然后 takeExactlyE最终 conduitVector ,但无济于事。最终我解决了这个问题:

import qualified Data.Conduit               as C
import qualified Data.Conduit.Combinators   as C
import qualified Data.ByteString            as B
import qualified Data.ByteString.Lazy       as BL

chunksOfAtLeast :: Monad m => Int -> C.Conduit B.ByteString m BL.ByteString
chunksOfAtLeast chunkSize = loop BL.empty chunkSize
  where 
    loop buffer n = do
      mchunk <- C.await
      case mchunk of 
        Nothing -> 
          -- Yield last remaining bytes
          when (n < chunkSize) (C.yield buffer)
        Just chunk -> do
          -- Yield when the buffer has been filled and start over
          let buffer' = buffer <> BL.fromStrict chunk
              l       = B.length chunk
          if n <= l
          then C.yield buffer' >> loop BL.empty chunkSize
          else loop buffer' (n - l)

附言我决定不为这个函数拆分更大的 block ,但这只是一个方便的简化。

然而,考虑到所有处理分 block 的管道函数[1],这似乎非常冗长。 , 2 , 3 , 4 ]。请帮忙!使用组合器肯定有更好的方法来做到这一点,但我缺少一些直觉!

附言可以像我所做的那样对缓冲区使用惰性字节串吗?我有点不清楚字节串的内部表示以及这是否会有所帮助,特别是因为我使用的是 BL.length我想这可能会评估thunk吗?

结论

只是为了详细说明迈克尔的回答和评论,我最终得到了这个管道:
import qualified Data.Conduit               as C
import qualified Data.Conduit.Combinators   as C
import qualified Data.ByteString            as B
import qualified Data.ByteString.Lazy       as BL

-- | "Strict" rechunk of a chunked conduit
chunksOfE' :: (MonadBase base m, PrimMonad base) 
         => Int 
         -> C.Conduit ByteString m ByteString
chunksOfE' chunkSize = C.vectorBuilder chunkSize C.mapM_E =$= C.map fromByteVector

我的理解是vectorBuilder将支付早期连接较小块的成本,将聚合 block 生成为严格的字节串。

据我所知,当聚合的 block 非常大和/或馈送到像网络套接字这样的自然流接口(interface)时,可能需要一种产生惰性字节串 block (即“ block block ”)的替代实现。这是我对“惰性字节串”版本的最佳尝试:
import qualified Data.Sequences.Lazy        as SL
import qualified Data.Sequences             as S
import qualified Data.Conduit.List          as CL

-- | "Lazy" rechunk of a chunked conduit
chunksOfE :: (Monad m, SL.LazySequence lazy strict)
          => S.Index lazy
          -> C.Conduit strict m lazy
chunksOfE chunkSize = CL.sequence C.sinkLazy =$= C.takeE chunkSize

最佳答案

这个怎么样?

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
import ClassyPrelude.Conduit

chunksOfAtLeast :: Monad m => Int -> Conduit ByteString m LByteString
chunksOfAtLeast chunkSize =
    loop
  where
    loop = do
        lbs <- takeCE chunkSize =$= sinkLazy
        unless (null lbs) $ do
            yield lbs
            loop

main :: IO ()
main =
    yieldMany ["hello", "there", "world!"]
        $$ chunksOfAtLeast 3
        =$ mapM_C print

根据您的目标,您可以采取许多其他方法。如果你想有一个严格的缓冲区,那么使用 vectorBuilder 的 blaze-builder 会很有意义。但这会保留您已经拥有的相同类型签名。

关于haskell - 使用组合器将管道重新分块成更大的 block ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25428065/

相关文章:

haskell - 管道流中的并行处理

haskell - 表现出奇怪的行为

testing - Haskell:如何使用快速检查测试(响应式(Reactive))FSM?

haskell - Haskell 中的守卫,使用检查执行操作

haskell - 了解函数签名

java - http中的分块和流有什么区别?

delphi - 如何处理 AST 中的评论?

javascript - HTML5 File.slice 方法实际上在做什么?

python - 设计一个正则表达式来查找任何名词短语

haskell - 保留管道流的每个第 n 个元素