haskell - 管道:多个流消费者

标签 haskell nlp conduit

我编写了一个程序来计算语料库中 NGram 的频率。我已经有一个函数,它使用 token 流并生成一个订单的 NGram:

ngram :: Monad m => Int -> Conduit t m [t]
trigrams = ngram 3
countFreq :: (Ord t, Monad m) => Consumer [t] m (Map [t] Int)

目前我只能将一个流消费者连接到流源:

tokens --- trigrams --- countFreq

如何将多个流消费者连接到同一个流源? 我想要这样的东西:

           .--- unigrams --- countFreq
           |--- bigrams  --- countFreq
tokens ----|--- trigrams --- countFreq
           '--- ...      --- countFreq

一个优点是并行运行每个消费者

编辑: 感谢 Petr,我想出了这个解决方案

spawnMultiple orders = do
    chan <- atomically newBroadcastTMChan

    results <- forM orders $ \_ -> newEmptyMVar
    threads <- forM (zip results orders) $
                        forkIO . uncurry (sink chan)

    forkIO . runResourceT $ sourceFile "test.txt"
                         $$ javascriptTokenizer
                         =$ sinkTMChan chan

    forM results readMVar

    where
        sink chan result n = do
            chan' <- atomically $ dupTMChan chan
            freqs <- runResourceT $ sourceTMChan chan'
                                 $$ ngram n
                                 =$ frequencies
            putMVar result freqs

最佳答案

我假设您希望所有接收器接收所有值。

我建议:

  1. 使用newBroadcastTMChan创建一个新 channel Control.Concurrent.STM.TMChan (stm-chans)。
  2. 使用此 channel 通过 sinkTBMChan 构建接收器来自您的主要生产者的 Data.Conduit.TMChan (stm-conduit)。
  3. 对于每个客户端,使用dupTMChan 创建自己的副本以供读取。启动一个新线程,使用 sourceTBMChan 读取此副本。
  4. 从您的帖子中收集结果。
  5. 确保您的客户端能够以与生成数据一样快的速度读取数据,否则可能会出现堆溢出。

(我还没有尝试过,请让我们知道它是如何工作的。)

<小时/>

更新:收集结果的一种方法是创建 MVar对于每个消费者线程。他们每个人都会在完成后 putMVar 其结果。您的主线程将对所有这些 MVar 进行 takeMVar,从而等待每个线程完成。例如,如果 varsMVar 的列表,主线程将发出 mapM takeMVar vars 来收集所有结果。

关于haskell - 管道:多个流消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17931053/

相关文章:

haskell - 强制严格评估——我做错了什么?

syntax - 如何在一行中混合绑定(bind)(<-)和赋值(let)? (在 haskell )

haskell - 管道的剩菜有什么好处?

text - 基于机器学习的领域特定命名实体识别 (NER)?

python - Pandas 中的条件词频计数

haskell - Attoparsec:跳过括号中的术语?

haskell - STT导管如何吊装

functional-programming - 在 Haskell 函数定义中应用 DRY 的指南

haskell - 为什么这个 lambda 演算缩减器不将 succ 0 缩减为 1?

algorithm - 分词统计方法