我正在尝试编写一个旨在重现 Qt 线程语义的库:信号可以连接到插槽,并且所有插槽都在已知线程中执行,因此绑定(bind)到同一线程的插槽彼此之间是线程安全的。
我有以下 API:
data Signal a = Signal Unique a
data Slot a = Slot Unique ThreadId (a -> IO ())
mkSignal :: IO (Signal a)
mkSlot :: ThreadId -> (Slot a -> a -> IO ()) -> IO (Slot a)
connect :: Signal a -> Slot a -> IO ()
-- callable from any thread
emit :: Signal a -> a -> IO ()
-- runs in Slot's thread as a result of `emit`
execute :: Slot a -> a -> IO ()
execute (Slot _ _ f) arg = f arg
问题来自
emit
至execute
.参数需要以某种方式在运行时存储,然后执行 IO 操作,但我似乎无法通过类型检查器。我需要的东西:
我尝试过的事情:
Dynamic
执行正确键入的 IO 操作.有dynApply ,但它是纯粹的。 mkSlot
的函数,而不是基于类型的任意函数。 我错过了什么?
最佳答案
首先,您确定 Slots 真的想在特定线程中执行吗?在 Haskell 中编写线程安全代码很容易,而且 GHC 中的线程非常轻量级,因此将所有事件处理程序执行绑定(bind)到特定的 Haskell 线程并不会获得太多 yield 。
另外,mkSlot
的回调不需要给 Slot 本身:你可以使用 recursive do-notation在其回调中绑定(bind)插槽,而无需担心将结绑定(bind)到 mkSlot
.
无论如何,您不需要像那些解决方案那样复杂的东西。我希望当您谈论存在类型时,您正在考虑发送类似 (a -> IO (), a)
的内容。通过 TChan
(您在评论中提到使用)并在另一端应用它,但您想要 TChan
为任何 a 接受这种类型的值,而不仅仅是一个特定的 a。这里的关键见解是,如果您有 (a -> IO (), a)
并且不知道 a 是什么,你唯一能做的就是将函数应用到值上,给你一个 IO ()
- 所以我们可以通过 channel 发送那些!
这是一个例子:
import Data.Unique
import Control.Applicative
import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM
newtype SlotGroup = SlotGroup (IO () -> IO ())
data Signal a = Signal Unique (TVar [Slot a])
data Slot a = Slot Unique SlotGroup (a -> IO ())
-- When executed, this produces a function taking an IO action and returning
-- an IO action that writes that action to the internal TChan. The advantage
-- of this approach is that it's impossible for clients of newSlotGroup to
-- misuse the internals by reading the TChan or similar, and the interface is
-- kept abstract.
newSlotGroup :: IO SlotGroup
newSlotGroup = do
chan <- newTChanIO
_ <- forkIO . forever . join . atomically . readTChan $ chan
return $ SlotGroup (atomically . writeTChan chan)
mkSignal :: IO (Signal a)
mkSignal = Signal <$> newUnique <*> newTVarIO []
mkSlot :: SlotGroup -> (a -> IO ()) -> IO (Slot a)
mkSlot group f = Slot <$> newUnique <*> pure group <*> pure f
connect :: Signal a -> Slot a -> IO ()
connect (Signal _ v) slot = atomically $ do
slots <- readTVar v
writeTVar v (slot:slots)
emit :: Signal a -> a -> IO ()
emit (Signal _ v) a = atomically (readTVar v) >>= mapM_ (`execute` a)
execute :: Slot a -> a -> IO ()
execute (Slot _ (SlotGroup send) f) a = send (f a)
这使用
TChan
将操作发送到每个插槽所绑定(bind)的工作线程。请注意,我对 Qt 不是很熟悉,所以我可能错过了模型的一些微妙之处。您也可以通过以下方式断开插槽:
disconnect :: Signal a -> Slot a -> IO ()
disconnect (Signal _ v) (Slot u _ _) = atomically $ do
slots <- readTVar v
writeTVar v $ filter keep slots
where keep (Slot u' _) = u' /= u
您可能想要
Map Unique (Slot a)
之类的东西而不是 [Slot a]
如果这可能是一个瓶颈。所以,这里的解决方案是(a)认识到你有一些基本上基于可变状态的东西,并使用可变变量来构造它; (b) 意识到函数和 IO 操作就像其他一切一样是一流的,因此您不必做任何特殊的事情来在运行时构造它们:)
顺便说一句,我建议保留
Signal
的实现和 Slot
通过不从定义它们的模块中导出它们的构造函数来抽象;毕竟,有很多方法可以在不更改 API 的情况下解决这种方法。
关于multithreading - 跨线程存储任意函数调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8686264/