我们有一些东西可以在 TChan 上转储值,然后由消费者处理这些值。但消费者无法跟上,因此当生产者在 channel 上倾倒大量内容时,我们会使用大量内存,但消费者却无法跟上。如果 channel 队列达到一定大小或某种程度,是否有一种直接的方法让生产者阻塞,以便我们可以让生产者等待消费者 catch ?
最佳答案
就像约翰的回答一样,我建议自己构建一个有界的 TChan。我的代码有所不同,因为它:
- 添加抽象(使
BTChan
成为 ADT) - 由于读取 IO 中的当前大小而消除了极端情况。
- 在读取时尽量不要在 TVar 大小中构建 thunk(在写入时不太重要,因为 thunk 只能是“一层深度”——下一个操作总是需要评估大小)。
- 现已被黑客攻击:http://hackage.haskell.org/package/bounded-tchan
注意:老实说,如果我是你,我会忽略所有这些答案,而只是使用他的评论中链接的代码(除非它是糟糕的代码)。我敢打赌它的作用与我在这里所做的相同,但经过更多思考。
{-# LANGUAGE BangPatterns #-}
module BTChan
( BTChan
, newBTChanIO
, newBTChan
, writeBTChan
, readBTChan
) where
import Control.Concurrent.STM
data BTChan a = BTChan {-# UNPACK #-} !Int (TChan a) (TVar Int)
-- | `newBTChan m` make a new bounded TChan of max size `m`
newBTChanIO :: Int -> IO (BTChan a)
newBTChanIO m = do
szTV <- newTVarIO 0
c <- newTChanIO
return (BTChan m c szTV)
newBTChan :: Int -> STM (BTChan a)
newBTChan m
| m < 1 = error "BTChan's can not have a maximum <= 0!"
| otherwise = do
szTV <- newTVar 0
c <- newTChan
return (BTChan m c szTV)
writeBTChan :: BTChan a -> a -> STM ()
writeBTChan (BTChan mx c szTV) x = do
sz <- readTVar szTV
if sz >= mx then retry else writeTVar szTV (sz + 1) >> writeTChan c x
readBTChan :: BTChan a -> STM a
readBTChan (BTChan _ c szTV) = do
x <- readTChan c
sz <- readTVar szTV
let !sz' = sz - 1
writeTVar szTV sz'
return x
sizeOfBTChan :: BTChan a -> STM Int
sizeOfBTChan (BTChan _ _ sTV) = readTVar sTV
STM 程序员需要注意的一些事项:
- 显式调用
retry
将会产生结果,将您的haskell线程置于阻塞状态,等待TVar
或TChan
之一的状态更改以便可以重试。这就是避免检查IO
中的值并使用yield
函数的方法。 - 与 MVar 一样,TVar 也可以引用 thunk,但这通常不是您想要的。也许有人应该制作一个 hackage 包,定义
STVar
、STChan
、SBTChan
和BTChan
(严格和/或有界 TVar)和 TChans)。 - 实际上有必要编写
newBTChanIO
而不是利用newBTChan
,因为new{TVar,TChan}IO
的实现是可以工作的即使在unsafePerformIO
下,atomically
也无法做到这一点。
编辑:通过将 TVar 分为一个供读者使用,一个供作者使用,从而减少争用,您实际上可以获得 2-5 倍的性能提升(取决于您使用的边界)。使用标准进行验证。改进版本 0.2.1 已经在 hackage 中。
关于haskell - 如何在 Haskell 的 TChan 上的生产者/消费者情况下限制生产者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5277633/