haskell - Pipes/conduit试图解决什么问题

标签 haskell pipe conduit haskell-pipes

我见过有人推荐管道/管道库来执行各种惰性 IO 相关任务。这些库到底解决了什么问题?

此外,当我尝试使用一些 hackage 相关的库时,很可能存在三个不同的版本。示例:

这让我很困惑。对于我的解析任务,我应该使用 attoparsec 还是 Pipes-attoparsec/attoparsec-conduit?与普通的 attoparsec 相比,管道/导管版本给我带来了什么好处?

最佳答案

惰性 IO

惰性 IO 的工作原理如下

readFile :: FilePath -> IO ByteString

其中 ByteString 保证只能逐 block 读取。为此,我们(几乎)可以写

-- given `readChunk` which reads a chunk beginning at n
readChunk :: FilePath -> Int -> IO (Int, ByteString)

readFile fp = readChunks 0 where
  readChunks n = do
    (n', chunk) <- readChunk fp n
    chunks      <- readChunks n'
    return (chunk <> chunks)

但在这里我们注意到,IO 操作 readChunks n' 是在返回可用的 chunk 部分结果之前执行的。这意味着我们一点也不懒惰。为了解决这个问题,我们使用 unsafeInterleaveIO

readFile fp = readChunks 0 where
  readChunks n = do
    (n', chunk) <- readChunk fp n
    chunks      <- unsafeInterleaveIO (readChunks n')
    return (chunk <> chunks)

这会导致 readChunks n' 立即返回,仅当强制执行该 thunk 时才会执行 IO 操作。

这是危险的部分:通过使用 unsafeInterleaveIO,我们将一堆 IO 操作延迟到了 future 的不确定点,这些操作取决于我们如何使用我们的 block ByteString

修复协程问题

我们想要做的是在对 readChunk 的调用和对 readChunks 的递归之间插入一个 block 处理步骤。

readFileCo :: Monoid a => FilePath -> (ByteString -> IO a) -> IO a
readFileCo fp action = readChunks 0 where
  readChunks n = do
    (n', chunk) <- readChunk fp n
    a           <- action chunk
    as          <- readChunks n'
    return (a <> as)

现在我们有机会在加载每个小块后执行任意 IO 操作。这让我们可以增量地完成更多工作,而无需将 ByteString 完全加载到内存中。不幸的是,它的组合性不是很好——我们需要构建我们的消费action并将其传递给我们的ByteString生产者才能运行。

基于管道的 IO

这本质上就是管道所解决的问题——它使我们能够轻松地编写有效的协同例程。例如,我们现在将文件读取器编写为 Producer,可以将其视为在其效果最终运行时“流式传输”文件 block 。

produceFile :: FilePath -> Producer ByteString IO ()
produceFile fp = produce 0 where
  produce n = do
    (n', chunk) <- liftIO (readChunk fp n)
    yield chunk
    produce n'

请注意此代码与上面的 readFileCo 之间的相似之处 - 我们只需将对协程操作的调用替换为 yield 我们的 chunk到目前为止已经制作完成。对 yield 的调用构建了一个 Producer 类型,而不是我们可以与其他 Pipe 组合的原始 IO 操作类型以构建一个称为 Effect IO () 的良好消费管道。

所有这些管道构建都是静态完成的,而不实际调用任何 IO 操作。这就是 pipes 让您更轻松地编写协程的方式。当我们在 main IO 操作中调用 runEffect 时,所有效果都会立即触发。

runEffect :: Effect IO () -> IO ()

阿托秒差距

那么为什么要将 attoparsec 插入 pipes 中?嗯,attoparsec 针对延迟解析进行了优化。如果您以有效的方式生成输入到 attoparsec 解析器的 block ,那么您将陷入僵局。你可以

  1. 使用严格的 IO 并将整个字符串加载到内存中,以便解析器延迟使用它。这很简单、可预测,但效率低下。
  2. 使用惰性 IO,并且无法根据已解析项的消耗计划来推断生产 IO 效果何时实际运行,从而导致可能的资源泄漏或关闭句柄异常。这比(1)更有效,但很容易变得不可预测;或者,
  3. 使用pipes(或conduit)构建一个协程系统,其中包括惰性attoparsec解析器,允许它在尽可能少的情况下运行根据需要输入,同时在整个流中尽可能延迟地生成解析值。

关于haskell - Pipes/conduit试图解决什么问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22742001/

相关文章:

c - 通过父级读取数据并使用 c 中的管道将其发送给子级

c - 重定向已杀死的可执行文件的标准输出

haskell - 使用 postgresql-simple 创建流式管道源

haskell - 为什么 Haskell 无法自动解析参数数量?

c - 在 CentOS 上增加管道内部缓冲区大小

haskell - wai 的意外管道行为

string - 通过 Haskell Conduit 重用 sinkFile

haskell - 实例声明中的非法类型签名

haskell - 如何使用管道对从文件中读取的行进行编号?

Haskell 逆波兰表示法