haskell - 概括一个函数来合并一组 Haskell 管道

标签 haskell haskell-pipes

我正在与 Haskell pipes package 合作.

我正在尝试使用pipes-concurrency将生产者列表合并在一起。

我想要达到的是:

merge :: MonadIO m => [Producer a m ()] -> Producer a m ()

因此给定一个生产者 s1 和另一个生产者 s2: r = merge [s1, s2] 这会产生以下行为:

s1 --1--1--1--|
s2 ---2---2---2|
r  --12-1-21--2|

按照我想出的教程页面中的代码:

mergeIO :: [Producer a IO ()] -> Producer a IO ()
mergeIO producers = do
    (output, input) <- liftIO $ spawn Unbounded
    _ <- liftIO $ mapM (fork output) producers
    fromInput input
  where
    fork :: Output a -> Producer a IO () -> IO ()
    fork output producer = void $ forkIO $ do runEffect $ producer >-> toOutput output
                                              performGC

它按预期工作。

但是我很难概括事物。

我的尝试:

merge :: (MonadIO m) => [Producer a m ()] -> Producer a m ()
merge producers = do
    (output, input) <- liftIO $ spawn Unbounded
    _ <- liftIO $ mapM (fork output) producers
    fromInput input
  where
    runEffectIO :: Monad m => Effect m r -> IO (m r)
    runEffectIO e = do
        x <- evaluate $ runEffect e
        return x
    fork output producer = forkIO $ do runEffectIO $ producer >-> toOutput output
                                       performGC

不幸的是,这个可以编译,但不会做太多其他事情。我猜我把 runEffectIO 弄得一团糟。我当前的其他方法 runEffectIO没有取得更好的结果。

程序:

main = do
    let producer = merge [repeater 1 (100 * 1000), repeater 2 (150 * 1000)]
    _ <- runEffect $ producer >-> taker 20
  where repeater :: Int -> Int -> Producer Int IO r
        repeater val delay = forever $ do
            lift $ threadDelay delay
            yield val
        taker :: Int -> Consumer Int IO ()
        taker 0 = return ()
        taker n = do
            val <- await
            liftIO $ putStrLn $ "Taker " ++ show n ++ ": " ++ show val
            taker $ n - 1

点击次数val <- await但没有到达liftIO $ putStrLn因此它不产生任何输出。但是它可以正常退出而不会挂起。

当我替换为 mergeIO 时对于 merge然后程序运行,我预计会输出 20 行。

最佳答案

虽然 MonadIO 不足以完成此操作,但 MonadBaseControl (来自 monad-control )被设计为允许在基本 monad 内嵌入任意转换器堆栈。伴侣包lifted-base提供了适用于变压器堆栈的 fork 版本。我整理了一个使用它来解决您的问题的示例 in the following Gist ,尽管主要的魔力是:

import qualified Control.Concurrent.Lifted as L
fork :: (MonadBaseControl IO m, MonadIO m) => Output a -> Producer a m () -> m ThreadId
fork output producer = L.fork $ do
    runEffect $ producer >-> toOutput output
    liftIO performGC

请注意,您应该了解以这种方式处理时单子(monad)状态会发生什么:对子线程中执行的任何可变状态的修改将仅与这些子线程隔离。换句话说,如果您使用 StateT,每个子线程都会以 fork 时上下文中的相同状态值开始,但随后您将拥有许多不更新的不同状态彼此。

有一个 appendix in the Yesod book关于 monad-control,尽管坦率地说它有点过时了。我只是不知道最近有什么教程。

关于haskell - 概括一个函数来合并一组 Haskell 管道,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22333846/

相关文章:

list - Haskell 中的过滤

list - Haskell - 如何返回网格中所有元素行的列表

haskell - 您将如何遍历目录并对所有文件执行某些功能并以内存有效的方式组合输出?

haskell - 如何将 `readfile` 函数的输出变成管道的源?

haskell - 如何使用管道检测输入结束

scala - 比较 Haskell 和 Scala 绑定(bind)/平面图示例

javascript - 使用来自 Haskell 的 javascript 库

c++ - 在 Haskell 中删除对象时调用函数

haskell - 如何将外部导出函数的参数传递到管道中?

json - 使用 Pipes.Aeson 在 Haskell 中对 JSON 进行流式解析