haskell - 我将如何使用每次传入都会重置的超时进行管道传输?

标签 haskell concurrency haskell-pipes

withTimeout函数假设管道 ConsoleEventCeTimeout每发送一次 s :: Int秒,如果没有收到任何东西。相反,它无法发送 CeTimeout在适当的时间举办事件。一个CeTimeout如果大于 s,则替换其他事件。几秒钟过去了,原始事件丢失了。也不是一个CeTimeout事件,应该是n*s CeTimeout n 的事件计算每个 s已经过去的第二个时期。错误在哪里,更正的地方是什么?谢谢!

withTimeout :: (MonadIO t) => Int -> Pipe ConsoleEvent ConsoleEvent t ()
withTimeout ((* 1000000) -> s) = join . liftIO $ work
  where
    work :: (MonadIO t) => IO (Pipe ConsoleEvent ConsoleEvent t ()) 
    work =
      do
        (oSent, iKept) <- spawn $ bounded 1
        (oKept, iSent) <- spawn $ unbounded
        (oTimeout, iTimeout) <- spawn $ bounded 1

        tid <- launchTimeout oTimeout >>= newMVar

        forkIO $ do
          runEffect . forever $ fromInput iKept >-> factorTimeout tid oTimeout >-> toOutput oKept

        forkIO $ do
          runEffect . forever $ fromInput iTimeout >-> toOutput oKept

        return $ do
          await >>= (liftIO . guardedSend oSent)
          (liftIO . guardedRecv $ iSent) >>= yield

    guardedSend :: Output ConsoleEvent -> ConsoleEvent -> IO ()
    guardedSend o ce =
      (atomically $ send o ce) >>= \case
        True -> return ()
        otherwise -> die $ "withTimeout can not send"

    guardedRecv :: Input ConsoleEvent -> IO ConsoleEvent
    guardedRecv i =
      (atomically $ recv i) >>= \case
        Just a -> return a
        otherwise -> die $ "withTimeout can not recv"

    launchTimeout :: Output ConsoleEvent -> IO ThreadId
    launchTimeout o =
      forkIO . forever $ do
        threadDelay $ s
        (atomically $ send o CeTimeout) >>= \case
          True -> return ()
          otherwise -> die "withTimeout can not send timeout"

    relaunchTimeout :: Output ConsoleEvent -> ThreadId -> IO ThreadId
    relaunchTimeout o oldTid = 
      do
        tid <- launchTimeout o
        killThread oldTid
        return tid

    factorTimeout :: MVar ThreadId -> Output ConsoleEvent -> Pipe ConsoleEvent ConsoleEvent IO ()
    factorTimeout v o =
      do
        ce <- await
        liftIO . modifyMVar_ v $ relaunchTimeout o
        yield ce

这是一个完全可执行的script .

最佳答案

好像是 Pipe只允许一个 yield根据 await .这意味着 CeTimeout不能随意向下输送,因为没有任何东西进入管道引起流动。我将不得不通过消息来源确认这一点;同时,该函数已被重构为返回 PipeProducer而不仅仅是 Pipe . Producer然后可以在调用函数中加入。最初的计划是只返回一个 Pipe这样调用函数就不必做任何额外的工作来使超时工作。那将是一个更加独立的解决方案。这个替代方案很好,因为它更明确。对于不熟悉管道的人来说,超时不会看起来像是凭空出现的。

withTimeout :: (MonadIO t) => Int -> IO (Pipe ConsoleEvent ConsoleEvent t (), Producer ConsoleEvent t ())
withTimeout ((* 1000000) -> s) =
  do
    (oTimeout, iTimeout) <- spawn $ bounded 1
    vTid <- launchTimeout oTimeout >>= newMVar

    return (factorTimeout vTid oTimeout, fromInput iTimeout)
  where
    launchTimeout :: Output ConsoleEvent -> IO ThreadId
    launchTimeout o =
      forkIO . forever $ do
        threadDelay $ s
        (atomically $ send o CeTimeout) >>= \case
          True -> return ()
          otherwise -> die "withTimeout can not send timeout"

    relaunchTimeout :: Output ConsoleEvent -> ThreadId -> IO ThreadId
    relaunchTimeout o oldTid = 
      do
        tid <- launchTimeout o
        killThread oldTid
        return tid

    factorTimeout :: (MonadIO t) => MVar ThreadId -> Output ConsoleEvent -> Pipe ConsoleEvent ConsoleEvent t ()
    factorTimeout v o =
      do
        ce <- await
        liftIO . modifyMVar_ v $ relaunchTimeout o
        yield ce

main :: IO ()
main =
  do
    hSetBuffering stdin NoBuffering
    hSetEcho stdin False

    exitSemaphore <- newEmptyMVar
    (o1, i1) <- spawn $ bounded 1
    (o2, i2) <- spawn $ bounded 1

    (timeoutTrap, timeoutRender) <- withTimeout 2

    runEffect $ yield CeBegan >-> toOutput o1

    forkIO $ do
      runEffect . forever $ chars >-> toOutput o1
      putMVar exitSemaphore ()

    -- other inputs would be piped to o1 here

    forkIO $ do
      runEffect . forever $ fromInput i1 >-> timeoutTrap >-> toOutput o2
      putMVar exitSemaphore ()

    forkIO $ do
      runEffect . forever $ timeoutRender >-> toOutput o2
      putMVar exitSemaphore ()

    forkIO $ do
      -- logic would be done before dumpPipe
      runEffect . forever $ fromInput i2 >-> dumpPipe >-> (await >> return ())
      putMVar exitSemaphore ()

    takeMVar exitSemaphore

这是一个完全可执行的script .

关于haskell - 我将如何使用每次传入都会重置的超时进行管道传输?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52540672/

相关文章:

indentation - 缩进在这里起什么作用?为什么一个缩进不起作用?

haskell - 简单的应用仿函数示例

recursion - 通过 WaitGroup 编排递归快速排序调用

Haskell 快速并发队列

Haskell Pipes——让管道消耗它产生的东西(本身)

类型别名声明中的 Haskell 类型类

haskell - 部分未装箱的向量

concurrency - 图灵完备和并行编程(真正的并发)

mysql - Django 模型创建时的竞争条件

haskell - 处理流媒体库中的和编码