haskell - 如何在 Haskell 的 TChan 上的生产者/消费者情况下限制生产者?

标签 haskell

我们有一些东西可以在 TChan 上转储值,然后由消费者处理这些值。但消费者无法跟上,因此当生产者在 channel 上倾倒大量内容时,我们会使用大量内存,但消费者却无法跟上。如果 channel 队列达到一定大小或某种程度,是否有一种直接的方法让生产者阻塞,以便我们可以让生产者等待消费者 catch ?

最佳答案

就像约翰的回答一样,我建议自己构建一个有界的 TChan。我的代码有所不同,因为它:

  1. 添加抽象(使 BTChan 成为 ADT)
  2. 由于读取 IO 中的当前大小而消除了极端情况。
  3. 在读取时尽量不要在 TVar 大小中构建 thunk(在写入时不太重要,因为 thunk 只能是“一层深度”——下一个操作总是需要评估大小)。
  4. 现已被黑客攻击:http://hackage.haskell.org/package/bounded-tchan

注意:老实说,如果我是你,我会忽略所有这些答案,而只是使用他的评论中链接的代码(除非它是糟糕的代码)。我敢打赌它的作用与我在这里所做的相同,但经过更多思考。

{-# LANGUAGE BangPatterns #-}
module BTChan
        ( BTChan
        , newBTChanIO
        , newBTChan
        , writeBTChan
        , readBTChan
        ) where

import Control.Concurrent.STM

data BTChan a = BTChan {-# UNPACK #-} !Int (TChan a) (TVar  Int)

-- | `newBTChan m` make a new bounded TChan of max size `m`
newBTChanIO :: Int -> IO (BTChan a)
newBTChanIO m = do
    szTV <- newTVarIO 0
    c    <- newTChanIO
    return (BTChan m c szTV)

newBTChan :: Int -> STM (BTChan a)
newBTChan m 
        | m < 1 = error "BTChan's can not have a maximum <= 0!"
        | otherwise = do
        szTV <- newTVar 0
        c    <- newTChan
        return (BTChan m c szTV)

writeBTChan :: BTChan a -> a -> STM ()
writeBTChan (BTChan mx c szTV) x = do
        sz <- readTVar szTV
        if sz >= mx then retry else writeTVar szTV (sz + 1) >> writeTChan c x

readBTChan :: BTChan a -> STM a
readBTChan (BTChan _ c szTV) = do
        x <- readTChan c
        sz <- readTVar szTV
        let !sz' = sz - 1
        writeTVar szTV sz'
        return x

sizeOfBTChan :: BTChan a -> STM Int
sizeOfBTChan (BTChan _ _ sTV) = readTVar sTV

STM 程序员需要注意的一些事项:

  • 显式调用retry将会产生结果,将您的haskell线程置于阻塞状态,等待TVarTChan之一的状态更改以便可以重试。这就是避免检查 IO 中的值并使用 yield 函数的方法。
  • 与 MVar 一样,TVar 也可以引用 thunk,但这通常不是您想要的。也许有人应该制作一个 hackage 包,定义 STVarSTChanSBTChanBTChan(严格和/或有界 TVar)和 TChans)。
  • 实际上有必要编写 newBTChanIO 而不是利用 newBTChan,因为 new{TVar,TChan}IO 的实现是可以工作的即使在 unsafePerformIO 下,atomically 也无法做到这一点。

编辑:通过将 TVar 分为一个供读者使用,一个供作者使用,从而减少争用,您实际上可以获得 2-5 倍的性能提升(取决于您使用的边界)。使用标准进行验证。改进版本 0.2.1 已经在 hackage 中。

关于haskell - 如何在 Haskell 的 TChan 上的生产者/消费者情况下限制生产者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5277633/

相关文章:

haskell - 如何动态分配循环数据?

haskell - 如何将两个参数的 Haskell 函数重写为无点样式

haskell - 使用 GHC API 时查找 cabal 包

haskell - 为什么我的 Haskell 程序使用 `par` 没有产生任何 Spark ?

haskell - 使 : Could not find module 'System'

Haskell 方式将 [IO String] 加入 IO String

haskell - 为什么安全偏函数使用 Maybe 而不是泛化到任何 Monad

haskell - Haskell 中有什么好的记录处理技巧吗?

haskell - Haskell 中的帕斯卡三角形

Haskell 条件错误