Haskell 快速并发队列

标签 haskell concurrency profiling stm haskell-pipes

问题

你好!我正在编写一个日志库,我很想创建一个记录器,它会在单独的线程中运行,而所有应用程序线程只会向它发送消息。我想为这个问题找到最高效的解决方案。我在这里需要简单的未绑定(bind)队列。

联系方式

我已经创建了一些测试来查看可用解决方案的执行情况,我在这里得到了非常奇怪的结果。我基于以下内容测试了 4 个实现(源代码如下):

  • pipes-concurrency
  • Control.Concurrent.Chan
  • Control.Concurrent.Chan.Unagi
  • MVar based as described in the book "Parallel and Concurrent Programming in Haskell"请注意,此技术为我们提供了容量为 1 的有界队列 - 它仅用于测试

  • 测试

    这是用于测试的源代码:
    {-# LANGUAGE NoMonomorphismRestriction #-}
    
    import Control.Concurrent (threadDelay)
    import Control.Monad (forever)
    import Pipes
    import qualified Pipes.Concurrent as Pipes
    import Control.Applicative
    import Control.Monad (replicateM_)
    import System.Environment (getArgs)
    
    import Control.Concurrent.Chan
    import Control.Concurrent (forkIO)
    import qualified Control.Concurrent.Chan.Unagi as U
    import Control.Concurrent.MVar
    import Criterion.Main
    
    data Event = Msg String | Status | Quit deriving (Show)
    
    ----------------------------------------------------------------------
    -- Pipes
    ----------------------------------------------------------------------
    
    pipesLogMsg = yield (Msg "hello")
    pipesManyLogs num = replicateM_ num pipesLogMsg
    
    pipesAddProducer num o = Pipes.forkIO $ do runEffect $ (pipesManyLogs num) >-> Pipes.toOutput o
                                               Pipes.performGC
    
    pipesHandler max = loop 0
      where
        loop mnum = do
            if mnum == max
                then lift $ pure ()
                else do event <- await
                        case event of
                            Msg _  -> loop (mnum + 1)
                            Status -> (lift $ putStrLn (show mnum)) *> loop mnum
                            Quit   -> return ()
    
    ----------------------------------------------------------------------
    -- Chan
    ----------------------------------------------------------------------
    
    chanAddProducer num ch = forkIO $ chanManyLogs num ch
    chanManyLogs num ch = replicateM_ num (writeChan ch (Msg "hello"))
    chanHandler ch max = handlerIO (readChan ch) max
    
    ----------------------------------------------------------------------
    -- Unagi-Chan
    ----------------------------------------------------------------------
    
    uchanAddProducer num ch = forkIO $ uchanManyLogs num ch
    uchanManyLogs num ch = replicateM_ num (U.writeChan ch (Msg "hello"))
    uchanHandler ch max = handlerIO (U.readChan ch) max
    
    ----------------------------------------------------------------------
    -- MVars
    ----------------------------------------------------------------------
    
    mvarAddProducer num m = forkIO $ mvarManyLogs num m
    mvarManyLogs num m = replicateM_ num (putMVar m (Msg "hello"))
    mvarHandler m max = handlerIO (takeMVar m) max
    
    ----------------------------------------------------------------------
    -- Utils
    ----------------------------------------------------------------------
    
    handlerIO f max = loop 0 where
        loop mnum = do
            if mnum == max 
                then pure ()
                else do event <- f
                        case event of
                             Msg _  -> loop (mnum + 1)
                             Status -> putStrLn (show mnum) *> loop mnum
                             Quit   -> return ()
    
    ----------------------------------------------------------------------
    -- Main
    ----------------------------------------------------------------------
    
    main = defaultMain [
          bench "pipes" $ nfIO $ do
            (output, input) <- Pipes.spawn Pipes.Unbounded
            replicateM_ prodNum (pipesAddProducer msgNum output)
            runEffect $ Pipes.fromInput input >-> pipesHandler totalMsg
        , bench "Chan" $ nfIO $ do
            ch <- newChan
            replicateM_ prodNum (chanAddProducer msgNum ch)
            chanHandler ch totalMsg
        , bench "Unagi-Chan" $ nfIO $ do
            (inCh, outCh) <- U.newChan
            replicateM_ prodNum (uchanAddProducer msgNum inCh)
            uchanHandler outCh totalMsg
        , bench "MVar" $ nfIO $ do
            m <- newEmptyMVar
            replicateM_ prodNum (mvarAddProducer msgNum m)
            mvarHandler m totalMsg
        ]
      where
        prodNum  = 20
        msgNum   = 1000
        totalMsg = msgNum * prodNum
    

    你可以用 ghc -O2 Main.hs 编译它并运行它。
    测试创建了 20 个消息生产者,每个生产者生产 1000000 条消息。

    结果
    benchmarking pipes
    time                 46.68 ms   (46.19 ms .. 47.31 ms)
                         0.999 R²   (0.999 R² .. 1.000 R²)
    mean                 47.59 ms   (47.20 ms .. 47.95 ms)
    std dev              708.3 μs   (558.4 μs .. 906.1 μs)
    
    benchmarking Chan
    time                 4.252 ms   (4.171 ms .. 4.351 ms)
                         0.995 R²   (0.991 R² .. 0.998 R²)
    mean                 4.233 ms   (4.154 ms .. 4.314 ms)
    std dev              244.8 μs   (186.3 μs .. 333.5 μs)
    variance introduced by outliers: 35% (moderately inflated)
    
    benchmarking Unagi-Chan
    time                 1.209 ms   (1.198 ms .. 1.224 ms)
                         0.996 R²   (0.993 R² .. 0.999 R²)
    mean                 1.267 ms   (1.244 ms .. 1.308 ms)
    std dev              102.4 μs   (61.70 μs .. 169.3 μs)
    variance introduced by outliers: 62% (severely inflated)
    
    benchmarking MVar
    time                 1.746 ms   (1.714 ms .. 1.774 ms)
                         0.997 R²   (0.995 R² .. 0.998 R²)
    mean                 1.716 ms   (1.694 ms .. 1.739 ms)
    std dev              73.99 μs   (65.32 μs .. 85.48 μs)
    variance introduced by outliers: 29% (moderately inflated)
    

    问题

    我很想问问你为什么管道并发版本执行得这么慢,为什么它甚至比基于 chan 的版本慢得多。我很惊讶 MVar 是所有版本中最快的一个 - 谁能告诉更多,为什么我们会得到这个结果,我们是否可以在任何情况下做得更好?

    最佳答案

    所以我可以给大家简单介绍一下Chan的一些分析。和 TQueue (这里 pipes-concurrency 正在内部使用)激发了一些进入 unagi-chan 的设计决策。 .我不确定它是否会回答你的问题。我建议在进行基准测试时 fork 不同的队列并进行变化,以真正了解正在发生的事情。


    Chan好像:

    data Chan a
     = Chan (MVar (Stream a)) -- pointer to "head", where we read from
            (MVar (Stream a)) -- pointer to "tail", where values written to
    
    type Stream a = MVar (ChItem a)
    data ChItem a = ChItem a (Stream a)
    

    这是一个 MVar 的链表s。两人MVar s 在 Chan type 分别充当指向列表当前头部和尾部的指针。这是写的样子:
    writeChan :: Chan a -> a -> IO () 
    writeChan (Chan _ writeVar) val = do 
        new_hole <- newEmptyMVar   mask_ $ do
        old_hole <- takeMVar writeVar           -- [1]
        putMVar old_hole (ChItem val new_hole)  -- [2]
        putMVar writeVar new_hole               -- [3]
    

    在 1 时,作者在写端锁定,在 2 时我们的项目 a可供读者使用,并且在 3 处为其他作者解锁写端。

    这实际上在单消费者/单生产者场景中表现得很好(参见 the graph here ),因为读取和写入不竞争。但是一旦你有多个并发作者,你就会开始遇到麻烦:
  • 一个写入 1 而另一个写入 2 的写入将被阻塞并被取消调度(我能够测量上下文切换的最快速度是 ~150ns(非常快);在某些情况下它可能会慢得多)。所以当你有很多作家在竞争时
    你基本上是通过调度程序进行一次大的往返,进入 MVar 的等待队列。然后终于可以完成写入。
  • 当写入器在 2 时被取消调度(因为它超时)时,它会持有一个锁,并且在可以再次重新调度之前不允许写入完成;当我们超额订阅时,即当我们的线程/核心比率很高时,这会成为一个更大的问题。

  • 最后,使用 MVar -per-item 在分配方面需要一些开销,更重要的是,当我们积累许多可变对象时,我们会造成很大的 GC 压力。

    队列
    TQueue很棒,因为 STM使得推断其正确性变得非常简单。这是一个功能性出队风格的队列,还有一个 write包括简单地读取 writer 堆栈,consing 我们的元素,然后将其写回:
    data TQueue a = TQueue (TVar [a])
                           (TVar [a])
    
    writeTQueue :: TQueue a -> a -> STM ()
    writeTQueue (TQueue _ write) a = do  
      listend <- readTVar write   -- a transaction with a consistent 
      writeTVar write (a:listend) -- view of memory
    

    如果在 writeTQueue 之后写回它的新堆栈,另一个交错写入也做同样的事情,其中​​一个写入将被重试。更多writeTQueue s 是交错的,争用的效果会恶化。然而,性能下降比 Chan 慢得多。因为只有一个 writeTVar可以取消竞争的操作writeTQueue s,并且交易非常小(只是一个读取和一个 (:) )。

    读取的工作原理是从写入端“出列”堆栈,反转它,并将反转的堆栈存储在它自己的变量中以便于“弹出”(总之,这给了我们摊销的 O(1) 推送和弹出)
    readTQueue :: TQueue a -> STM a
    readTQueue (TQueue read write) = do
      xs <- readTVar read
      case xs of
        (x:xs') -> do writeTVar read xs'
                      return x
        [] -> do ys <- readTVar write
                 case ys of
                   [] -> retry
                   _  -> case reverse ys of
                           [] -> error "readTQueue"
                           (z:zs) -> do writeTVar write []
                                        writeTVar read zs
                                        return z
    

    读者对作者有一个对称的温和争论问题。在一般情况下,读者和作者不会竞争,但是当读者堆栈耗尽时,读者会与其他读者和作者竞争。我怀疑你是否预装了 TQueue有足够的值,然后启动 4 个读取器和 4 个写入器,您可能会引发活锁,因为在下一次写入之前反向难以完成。值得注意的是,与 MVar 不同的是, 写信给 TVar许多读者正在等待,同时唤醒他们(这可能或多或少效率高,取决于场景)。

    我怀疑你没有看到 TQueue 的很多弱点在你的测试中;主要是您看到了写入争用的适度影响以及大量分配和 GC 大量可变对象的开销。

    鳗鱼酱
    unagi-chan最初旨在很好地处理争用。它在概念上非常简单,但实现有一些复杂性
    data ChanEnd a = ChanEnd AtomicCounter (IORef (Int , Stream a))
    
    data Stream a = Stream (Array (Cell a)) (IORef (Maybe (Stream a)))
    
    data Cell a = Empty | Written a | Blocking (MVar a)
    

    队列的读写端共享 Stream它们协调传递值(从写入器到读取器)和阻塞指示(从读取器到写入器),读取和写入端每个都有一个独立的原子计数器。写操作如下:
  • 作者调用原子 incrCounter在写计数器上接收其唯一索引以与其(单个)读取器协调
  • 编写器找到它的单元格并执行 Written a 的 CAS
  • 如果成功它退出,否则它看到一个读者已经击败它并且正在阻塞(或继续阻塞),所以它执行 (\Blocking v)-> putMVar v a)并退出。

  • 读取以类似且明显的方式工作。

    第一个创新是使争用点成为原子操作,在争用时不会降级(如 CAS/重试循环或类似 Chan 的锁)。基于简单的基准测试和实验,fetch-and-add primop, exposed by the atomic-primops library效果最好。

    然后在 2 中,读取器和写入器都只需要执行一次比较和交换(读取器的快速路径是简单的非原子读取)来完成协调。

    所以要尝试制作unagi-chan很好,我们
  • 使用 fetch-and-add 处理争用点
  • 使用无锁技术,这样当我们超额订阅一个在不合适的时间被取消调度的线程时,不会阻塞其他线程的进程(被阻塞的写入器最多可以阻塞计数器“分配”给它的读取器;读取警告re. async exceptions在 unagi-chan 文档中,并注意 Chan 在这里有更好的语义)
  • 使用数组来存储我们的元素,它具有更好的局部性(但见下文)每个元素的开销更低,并且对 GC 的压力更小

  • 最后一个注意事项。使用数组:并发写入数组对于扩展来说通常是一个坏主意,因为您会导致大量缓存一致性流量,因为缓存行在编写器线程之间来回无效。通用术语是“虚假共享”。但是我能想到的替代设计也有缓存方面的优点和缺点,比如 strip 写入或其他东西;我一直在对此进行一些试验,但目前没有任何结论。

    我们合理关注虚假共享的一个地方是在我们的计数器中,我们将其对齐并填充为 64 字节;这确实出现在基准测试中,唯一的缺点是内存使用量增加。

    关于Haskell 快速并发队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27933941/

    相关文章:

    list - 如何使用自定义版本的List : `data List a = Nil | Cons a (List a)` ?

    haskell - 以下 "Dining Philosophers"的解决方案有什么问题?

    java - JTextArea默认值?

    javascript - JavaScript内存和泄漏问题

    python - Python 的 cProfile 无法识别函数名称

    linux - 如何获取 perf stat 列出所有可能的事件

    haskell - 避免在 Hughes 的列表仿函数实例中使用 unsafeCoerce

    haskell - 如何在 emacs - haskell 模式下运行 haskell 应用程序?

    http - Golang 检测飞行中的请求

    c++ - 并行搜索不同的值?