multithreading - 跨线程存储任意函数调用

标签 multithreading haskell types signals-slots

我正在尝试编写一个旨在重现 Qt 线程语义的库:信号可以连接到插槽,并且所有插槽都在已知线程中执行,因此绑定(bind)到同一线程的插槽彼此之间是线程安全的。

我有以下 API:

data Signal a = Signal Unique a
data Slot a = Slot Unique ThreadId (a -> IO ())

mkSignal :: IO (Signal a)
mkSlot   :: ThreadId -> (Slot a -> a -> IO ()) -> IO (Slot a)

connect :: Signal a -> Slot a -> IO ()

-- callable from any thread
emit :: Signal a -> a -> IO ()

-- runs in Slot's thread as a result of `emit`
execute :: Slot a -> a -> IO ()
execute (Slot _ _ f) arg = f arg

问题来自 emitexecute .参数需要以某种方式在运行时存储,然后执行 IO 操作,但我似乎无法通过类型检查器。

我需要的东西:
  • 类型安全:信号不应连接到期望不同类型的插槽。
  • 类型无关:任何给定类型都可以有多个插槽(也许这可以通过 newtype 和/或 TH 放宽)。
  • 易用性:因为这是一个库,信号和槽应该很容易创建。

  • 我尝试过的事情:
  • Data.Dynamic : 使整个事情变得非常脆弱,我还没有找到一种方法来对 Dynamic 执行正确键入的 IO 操作.有dynApply ,但它是纯粹的。
  • Existential types : 我需要执行传递给 mkSlot 的函数,而不是基于类型的任意函数。
  • Data.HList : 我不够聪明,无法弄清楚。

  • 我错过了什么?

    最佳答案

    首先,您确定 Slots 真的想在特定线程中执行吗?在 Haskell 中编写线程安全代码很容易,而且 GHC 中的线程非常轻量级,因此将所有事件处理程序执行绑定(bind)到特定的 Haskell 线程并不会获得太多 yield 。

    另外,mkSlot的回调不需要给 Slot 本身:你可以使用 recursive do-notation在其回调中绑定(bind)插槽,而无需担心将结绑定(bind)到 mkSlot .

    无论如何,您不需要像那些解决方案那样复杂的东西。我希望当您谈论存在类型时,您正在考虑发送类似 (a -> IO (), a) 的内容。通过 TChan (您在评论中提到使用)并在另一端应用它,但您想要 TChan为任何 a 接受这种类型的值,而不仅仅是一个特定的 a。这里的关键见解是,如果您有 (a -> IO (), a)并且不知道 a 是什么,你唯一能做的就是将函数应用到值上,给你一个 IO () - 所以我们可以通过 channel 发送那些!

    这是一个例子:

    import Data.Unique
    import Control.Applicative
    import Control.Monad
    import Control.Concurrent
    import Control.Concurrent.STM
    
    newtype SlotGroup = SlotGroup (IO () -> IO ())
    
    data Signal a = Signal Unique (TVar [Slot a])
    data Slot a = Slot Unique SlotGroup (a -> IO ())
    
    -- When executed, this produces a function taking an IO action and returning
    -- an IO action that writes that action to the internal TChan. The advantage
    -- of this approach is that it's impossible for clients of newSlotGroup to
    -- misuse the internals by reading the TChan or similar, and the interface is
    -- kept abstract.
    newSlotGroup :: IO SlotGroup
    newSlotGroup = do
      chan <- newTChanIO
      _ <- forkIO . forever . join . atomically . readTChan $ chan
      return $ SlotGroup (atomically . writeTChan chan)
    
    mkSignal :: IO (Signal a)
    mkSignal = Signal <$> newUnique <*> newTVarIO []
    
    mkSlot :: SlotGroup -> (a -> IO ()) -> IO (Slot a)
    mkSlot group f = Slot <$> newUnique <*> pure group <*> pure f
    
    connect :: Signal a -> Slot a -> IO ()
    connect (Signal _ v) slot = atomically $ do
      slots <- readTVar v
      writeTVar v (slot:slots)
    
    emit :: Signal a -> a -> IO ()
    emit (Signal _ v) a = atomically (readTVar v) >>= mapM_ (`execute` a)
    
    execute :: Slot a -> a -> IO ()
    execute (Slot _ (SlotGroup send) f) a = send (f a)
    

    这使用 TChan将操作发送到每个插槽所绑定(bind)的工作线程。

    请注意,我对 Qt 不是很熟悉,所以我可能错过了模型的一些微妙之处。您也可以通过以下方式断开插槽:
    disconnect :: Signal a -> Slot a -> IO ()
    disconnect (Signal _ v) (Slot u _ _) = atomically $ do
      slots <- readTVar v
      writeTVar v $ filter keep slots
      where keep (Slot u' _) = u' /= u
    

    您可能想要 Map Unique (Slot a) 之类的东西而不是 [Slot a]如果这可能是一个瓶颈。

    所以,这里的解决方案是(a)认识到你有一些基本上基于可变状态的东西,并使用可变变量来构造它; (b) 意识到函数和 IO 操作就像其他一切一样是一流的,因此您不必做任何特殊的事情来在运行时构造它们:)

    顺便说一句,我建议保留 Signal 的实现和 Slot通过不从定义它们的模块中导出它们的构造函数来抽象;毕竟,有很多方法可以在不更改 API 的情况下解决这种方法。

    关于multithreading - 跨线程存储任意函数调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8686264/

    相关文章:

    java - 如何停止 BufferedReader readline()?

    string - 将 String 转换为 ByteString 的最佳方法是什么

    objective-c - 在 32 位和 64 位上编码/解码 CGFloat

    c++ - 语言之间的堆栈数据

    multithreading - Actix 系统,多个仲裁器 = 多少个线程

    java - 依赖jar中的异步注释,不启动新线程

    objective-c - iPad GUI 仅在触摸后更新

    database - 将 Haskell 与 "business applications"的数据库后端一起使用

    haskell - 使用镜头应用依赖于多个领域的功能

    c++ - 从缓冲区输入的位