haskell - 共享可变状态: when to use IORefs

标签 haskell ioref shared-state

我有一个写入 Map 和 PSQ 的主线程。在 Map 和 PSQ 中,我使用相同的键,以便通过查看 PSQ,可以以 O(1) 复杂度找到具有最小优先级的条目,并将其映射到 Map 中的值。

现在,虽然我的主线程在需要时添加/修改了 Map 和 PSQ,但我还有第二个线程不断(forever $ do)查看 PSQ 以确定最旧的 key 何时出现是 N 毫秒前,然后应该刷新它。

要实现这一点,两个线程都需要查看相同的可变数据。维持状态的最佳方法是什么? IOREfs 会遇到这种情况吗?还有什么可能的方法来解决这个问题?

“一些”预 Alpha 代码在这里:

import Data.Time
import Data.Functor
import Data.Time.Clock.POSIX
import qualified Data.PSQueue as PSQ
import qualified Data.Map as Map
import Data.Maybe
import Control.Concurrent
import Control.Concurrent.MVar
import Control.Monad
import Network.Socket hiding (send, sendTo, recv, recvFrom)
import Network.Socket.ByteString
import qualified Data.ByteString.Char8 as B 

--PSQ = (host, PID) POSIXTime
--where the tuple is k and POSIXTime is p

--Map is (host, PortNumber) [messages]
--where the tuple is the key and [messages] is a list of messages

key = ("192.168.1.1", 4711)
messages = ["aaa", "bbbb", "ccccc"]

newRq :: IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
newRq = do
      time <- getPOSIXTime
      let q = PSQ.singleton key time
      let m = Map.singleton key messages
      return (q, m)

appendMsg :: String -> (String, Integer) -> Map.Map (String, Integer) [String] -> Map.Map (String, Integer) [String]
appendMsg newmsgs (host, port) m =
      let Just messages' = Map.lookup (host,port) m
          l = length . concat $ messages'
          l' = l + length newmsgs
      in 
      if l' < 1400 then Map.adjust (++ [newmsgs]) (host, port) m else m

insertNewRec :: (String, Integer) -> [String] -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
insertNewRec (a,b) c q m = do
      time <- getPOSIXTime
      let q1 = PSQ.insert (a,b) time q
      let m1 = Map.insert (a,b) c m
      return (q1, m1)

sendq :: Socket -> B.ByteString -> String -> PortNumber -> IO ()
sendq s datastring host port = do
      hostAddr <- inet_addr host
      sendAllTo s datastring (SockAddrInet port hostAddr)
      return ()

deleteRec :: (String, Integer) -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
deleteRec (host, port) q m = (q', m')
      where 
         m' = Map.delete (host, port) m
         q' = PSQ.delete (host, port) q

loopMyQ q m1 done = forever $ do 
      let Just m = PSQ.findMin q
      let time = (PSQ.prio m) + 0.200 --adds 200ms
      now <- getPOSIXTime
      if now < time
        then print (m1) 
        --here eventually I would call the send function to flush the queue
        else putMVar done ()

sendrecv :: Socket -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> String -> IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String])) 
sendrecv s q1 m1 msg = do
     let m2 = appendMsg msg key m1
         (q3, m3) = case m2 of   
                   val | m2 == m1 -> deleteRec key q1 m1
                       | otherwise -> (q1, m2)
     (q5, m5) <- if (m2 == m1) then (do (q4, m4) <- insertNewRec key (words msg) q3 m3
                                        return (q4, m4)) else return (q1, m2)
     when (m2 == m1) (let Just messages = Map.lookup ("192.168.1.1", 4711) m1 in sendq s (B.pack $ unwords messages) "192.168.1.1" 4711)
     return (q5, m5)

--main :: IO()
main = withSocketsDo $ do
     s <- socket AF_INET Datagram defaultProtocol
     (q1, m1) <- newRq
     done <- newEmptyMVar
     forkIO $ loopMyQ q1 m1 done
     (q', m') <- foldM (\(q, m) _ -> sendrecv s q m "ping") (q1, m1) [1..1000]
     takeMVar done
     --print ("longer than 200ms ago")

最佳答案

您很可能想使用MVarsTVars保持线程间的一致状态。 IORef 不是线程安全的。

我建议使用 STM(和 TVar)来解决此问题。您正在处理对多个数据结构的并发访问,并且 STM 的可组合性比必须考虑 MVar 的锁定顺序更容易处理。

查看您的代码后,TVar 似乎是您最好的选择。将您的 PSQ 和 Map 包装在两个不同的 TVar 中。将需要两者一致 View 的所有代码包装在原子事务中。在大多数情况下,您的代码将“正常工作”。但是,如果存在锁争用,原子 block 将被重试,直到它起作用为止。

关于haskell - 共享可变状态: when to use IORefs,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9369205/

相关文章:

haskell - 无法理解 haskell 类型签名 "instance Misty ((->) t "

Haskell 重写规则未在不同模块中触发

Haskell:尝试并行 `atomicModifyIORef` 实现

Haskell IORef - 一个答案与一个获得答案的函数

python - 如何在 Python 中使用 Managers() 在多个进程之间共享一个字符串?

haskell - LLVM 对 GHC 的调用约定

haskell - 在llvm绑定(bind)中将函数作为形式参数传递时,上下文缩减堆栈溢出

variables - State、ST、IORef 和 MVar 之间的区别

c++ - 在模板实例化之间共享静态成员? (不可能的?)

python - 在 ruamel.yaml 中使用自定义构造函数时如何避免全局状态?