我编写了一个程序来计算语料库中 NGram 的频率。我已经有一个函数,它使用 token 流并生成一个订单的 NGram:
ngram :: Monad m => Int -> Conduit t m [t]
trigrams = ngram 3
countFreq :: (Ord t, Monad m) => Consumer [t] m (Map [t] Int)
目前我只能将一个流消费者连接到流源:
tokens --- trigrams --- countFreq
如何将多个流消费者连接到同一个流源? 我想要这样的东西:
.--- unigrams --- countFreq
|--- bigrams --- countFreq
tokens ----|--- trigrams --- countFreq
'--- ... --- countFreq
一个优点是并行运行每个消费者
编辑: 感谢 Petr,我想出了这个解决方案
spawnMultiple orders = do
chan <- atomically newBroadcastTMChan
results <- forM orders $ \_ -> newEmptyMVar
threads <- forM (zip results orders) $
forkIO . uncurry (sink chan)
forkIO . runResourceT $ sourceFile "test.txt"
$$ javascriptTokenizer
=$ sinkTMChan chan
forM results readMVar
where
sink chan result n = do
chan' <- atomically $ dupTMChan chan
freqs <- runResourceT $ sourceTMChan chan'
$$ ngram n
=$ frequencies
putMVar result freqs
最佳答案
我假设您希望所有接收器接收所有值。
我建议:
- 使用
newBroadcastTMChan
创建一个新 channelControl.Concurrent.STM.TMChan
(stm-chans)。 - 使用此 channel 通过
sinkTBMChan
构建接收器来自您的主要生产者的Data.Conduit.TMChan
(stm-conduit)。 - 对于每个客户端,使用
dupTMChan
创建自己的副本以供读取。启动一个新线程,使用sourceTBMChan
读取此副本。 - 从您的帖子中收集结果。
- 确保您的客户端能够以与生成数据一样快的速度读取数据,否则可能会出现堆溢出。
(我还没有尝试过,请让我们知道它是如何工作的。)
<小时/>更新:收集结果的一种方法是创建 MVar
对于每个消费者线程。他们每个人都会在完成后 putMVar
其结果。您的主线程将对所有这些 MVar 进行 takeMVar
,从而等待每个线程完成。例如,如果 vars
是 MVar
的列表,主线程将发出 mapM takeMVar vars
来收集所有结果。
关于haskell - 管道:多个流消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17931053/