问题
你好!我正在编写一个日志库,我很想创建一个记录器,它会在单独的线程中运行,而所有应用程序线程只会向它发送消息。我想为这个问题找到最高效的解决方案。我在这里需要简单的未绑定(bind)队列。
联系方式
我已经创建了一些测试来查看可用解决方案的执行情况,我在这里得到了非常奇怪的结果。我基于以下内容测试了 4 个实现(源代码如下):
测试
这是用于测试的源代码:
{-# LANGUAGE NoMonomorphismRestriction #-}
import Control.Concurrent (threadDelay)
import Control.Monad (forever)
import Pipes
import qualified Pipes.Concurrent as Pipes
import Control.Applicative
import Control.Monad (replicateM_)
import System.Environment (getArgs)
import Control.Concurrent.Chan
import Control.Concurrent (forkIO)
import qualified Control.Concurrent.Chan.Unagi as U
import Control.Concurrent.MVar
import Criterion.Main
data Event = Msg String | Status | Quit deriving (Show)
----------------------------------------------------------------------
-- Pipes
----------------------------------------------------------------------
pipesLogMsg = yield (Msg "hello")
pipesManyLogs num = replicateM_ num pipesLogMsg
pipesAddProducer num o = Pipes.forkIO $ do runEffect $ (pipesManyLogs num) >-> Pipes.toOutput o
Pipes.performGC
pipesHandler max = loop 0
where
loop mnum = do
if mnum == max
then lift $ pure ()
else do event <- await
case event of
Msg _ -> loop (mnum + 1)
Status -> (lift $ putStrLn (show mnum)) *> loop mnum
Quit -> return ()
----------------------------------------------------------------------
-- Chan
----------------------------------------------------------------------
chanAddProducer num ch = forkIO $ chanManyLogs num ch
chanManyLogs num ch = replicateM_ num (writeChan ch (Msg "hello"))
chanHandler ch max = handlerIO (readChan ch) max
----------------------------------------------------------------------
-- Unagi-Chan
----------------------------------------------------------------------
uchanAddProducer num ch = forkIO $ uchanManyLogs num ch
uchanManyLogs num ch = replicateM_ num (U.writeChan ch (Msg "hello"))
uchanHandler ch max = handlerIO (U.readChan ch) max
----------------------------------------------------------------------
-- MVars
----------------------------------------------------------------------
mvarAddProducer num m = forkIO $ mvarManyLogs num m
mvarManyLogs num m = replicateM_ num (putMVar m (Msg "hello"))
mvarHandler m max = handlerIO (takeMVar m) max
----------------------------------------------------------------------
-- Utils
----------------------------------------------------------------------
handlerIO f max = loop 0 where
loop mnum = do
if mnum == max
then pure ()
else do event <- f
case event of
Msg _ -> loop (mnum + 1)
Status -> putStrLn (show mnum) *> loop mnum
Quit -> return ()
----------------------------------------------------------------------
-- Main
----------------------------------------------------------------------
main = defaultMain [
bench "pipes" $ nfIO $ do
(output, input) <- Pipes.spawn Pipes.Unbounded
replicateM_ prodNum (pipesAddProducer msgNum output)
runEffect $ Pipes.fromInput input >-> pipesHandler totalMsg
, bench "Chan" $ nfIO $ do
ch <- newChan
replicateM_ prodNum (chanAddProducer msgNum ch)
chanHandler ch totalMsg
, bench "Unagi-Chan" $ nfIO $ do
(inCh, outCh) <- U.newChan
replicateM_ prodNum (uchanAddProducer msgNum inCh)
uchanHandler outCh totalMsg
, bench "MVar" $ nfIO $ do
m <- newEmptyMVar
replicateM_ prodNum (mvarAddProducer msgNum m)
mvarHandler m totalMsg
]
where
prodNum = 20
msgNum = 1000
totalMsg = msgNum * prodNum
你可以用
ghc -O2 Main.hs
编译它并运行它。测试创建了 20 个消息生产者,每个生产者生产 1000000 条消息。
结果
benchmarking pipes
time 46.68 ms (46.19 ms .. 47.31 ms)
0.999 R² (0.999 R² .. 1.000 R²)
mean 47.59 ms (47.20 ms .. 47.95 ms)
std dev 708.3 μs (558.4 μs .. 906.1 μs)
benchmarking Chan
time 4.252 ms (4.171 ms .. 4.351 ms)
0.995 R² (0.991 R² .. 0.998 R²)
mean 4.233 ms (4.154 ms .. 4.314 ms)
std dev 244.8 μs (186.3 μs .. 333.5 μs)
variance introduced by outliers: 35% (moderately inflated)
benchmarking Unagi-Chan
time 1.209 ms (1.198 ms .. 1.224 ms)
0.996 R² (0.993 R² .. 0.999 R²)
mean 1.267 ms (1.244 ms .. 1.308 ms)
std dev 102.4 μs (61.70 μs .. 169.3 μs)
variance introduced by outliers: 62% (severely inflated)
benchmarking MVar
time 1.746 ms (1.714 ms .. 1.774 ms)
0.997 R² (0.995 R² .. 0.998 R²)
mean 1.716 ms (1.694 ms .. 1.739 ms)
std dev 73.99 μs (65.32 μs .. 85.48 μs)
variance introduced by outliers: 29% (moderately inflated)
问题
我很想问问你为什么管道并发版本执行得这么慢,为什么它甚至比基于 chan 的版本慢得多。我很惊讶 MVar 是所有版本中最快的一个 - 谁能告诉更多,为什么我们会得到这个结果,我们是否可以在任何情况下做得更好?
最佳答案
所以我可以给大家简单介绍一下Chan
的一些分析。和 TQueue
(这里 pipes-concurrency
正在内部使用)激发了一些进入 unagi-chan
的设计决策。 .我不确定它是否会回答你的问题。我建议在进行基准测试时 fork 不同的队列并进行变化,以真正了解正在发生的事情。
陈Chan
好像:
data Chan a
= Chan (MVar (Stream a)) -- pointer to "head", where we read from
(MVar (Stream a)) -- pointer to "tail", where values written to
type Stream a = MVar (ChItem a)
data ChItem a = ChItem a (Stream a)
这是一个
MVar
的链表s。两人MVar
s 在 Chan
type 分别充当指向列表当前头部和尾部的指针。这是写的样子:writeChan :: Chan a -> a -> IO ()
writeChan (Chan _ writeVar) val = do
new_hole <- newEmptyMVar mask_ $ do
old_hole <- takeMVar writeVar -- [1]
putMVar old_hole (ChItem val new_hole) -- [2]
putMVar writeVar new_hole -- [3]
在 1 时,作者在写端锁定,在 2 时我们的项目
a
可供读者使用,并且在 3 处为其他作者解锁写端。这实际上在单消费者/单生产者场景中表现得很好(参见 the graph here ),因为读取和写入不竞争。但是一旦你有多个并发作者,你就会开始遇到麻烦:
你基本上是通过调度程序进行一次大的往返,进入
MVar
的等待队列。然后终于可以完成写入。 最后,使用
MVar
-per-item 在分配方面需要一些开销,更重要的是,当我们积累许多可变对象时,我们会造成很大的 GC 压力。队列
TQueue
很棒,因为 STM
使得推断其正确性变得非常简单。这是一个功能性出队风格的队列,还有一个 write
包括简单地读取 writer 堆栈,consing 我们的元素,然后将其写回:data TQueue a = TQueue (TVar [a])
(TVar [a])
writeTQueue :: TQueue a -> a -> STM ()
writeTQueue (TQueue _ write) a = do
listend <- readTVar write -- a transaction with a consistent
writeTVar write (a:listend) -- view of memory
如果在
writeTQueue
之后写回它的新堆栈,另一个交错写入也做同样的事情,其中一个写入将被重试。更多writeTQueue
s 是交错的,争用的效果会恶化。然而,性能下降比 Chan
慢得多。因为只有一个 writeTVar
可以取消竞争的操作writeTQueue
s,并且交易非常小(只是一个读取和一个 (:)
)。读取的工作原理是从写入端“出列”堆栈,反转它,并将反转的堆栈存储在它自己的变量中以便于“弹出”(总之,这给了我们摊销的 O(1) 推送和弹出)
readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
xs <- readTVar read
case xs of
(x:xs') -> do writeTVar read xs'
return x
[] -> do ys <- readTVar write
case ys of
[] -> retry
_ -> case reverse ys of
[] -> error "readTQueue"
(z:zs) -> do writeTVar write []
writeTVar read zs
return z
读者对作者有一个对称的温和争论问题。在一般情况下,读者和作者不会竞争,但是当读者堆栈耗尽时,读者会与其他读者和作者竞争。我怀疑你是否预装了
TQueue
有足够的值,然后启动 4 个读取器和 4 个写入器,您可能会引发活锁,因为在下一次写入之前反向难以完成。值得注意的是,与 MVar
不同的是, 写信给 TVar
许多读者正在等待,同时唤醒他们(这可能或多或少效率高,取决于场景)。我怀疑你没有看到
TQueue
的很多弱点在你的测试中;主要是您看到了写入争用的适度影响以及大量分配和 GC 大量可变对象的开销。鳗鱼酱
unagi-chan
最初旨在很好地处理争用。它在概念上非常简单,但实现有一些复杂性data ChanEnd a = ChanEnd AtomicCounter (IORef (Int , Stream a))
data Stream a = Stream (Array (Cell a)) (IORef (Maybe (Stream a)))
data Cell a = Empty | Written a | Blocking (MVar a)
队列的读写端共享
Stream
它们协调传递值(从写入器到读取器)和阻塞指示(从读取器到写入器),读取和写入端每个都有一个独立的原子计数器。写操作如下:incrCounter
在写计数器上接收其唯一索引以与其(单个)读取器协调Written a
的 CAS (\Blocking v)-> putMVar v a)
并退出。 读取以类似且明显的方式工作。
第一个创新是使争用点成为原子操作,在争用时不会降级(如 CAS/重试循环或类似 Chan 的锁)。基于简单的基准测试和实验,fetch-and-add primop, exposed by the
atomic-primops
library效果最好。然后在 2 中,读取器和写入器都只需要执行一次比较和交换(读取器的快速路径是简单的非原子读取)来完成协调。
所以要尝试制作
unagi-chan
很好,我们unagi-chan
文档中,并注意 Chan
在这里有更好的语义)最后一个注意事项。使用数组:并发写入数组对于扩展来说通常是一个坏主意,因为您会导致大量缓存一致性流量,因为缓存行在编写器线程之间来回无效。通用术语是“虚假共享”。但是我能想到的替代设计也有缓存方面的优点和缺点,比如 strip 写入或其他东西;我一直在对此进行一些试验,但目前没有任何结论。
我们合理关注虚假共享的一个地方是在我们的计数器中,我们将其对齐并填充为 64 字节;这确实出现在基准测试中,唯一的缺点是内存使用量增加。
关于Haskell 快速并发队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27933941/