haskell - 在 Haskell 中,如何在 Web 客户端断开连接时中止计算

标签 haskell tcp haskell-snap-framework

我有一个基于 Haskell 的网络服务,它执行的计算对于某些输入可能需要很长时间才能完成。 (这里的“真长”是指超过一分钟)

因为执行该计算会占用服务器上所有可用的 CPU,所以当传入请求到达时,我将它们放入队列中(好吧,实际上是一个堆栈,原因与典型客户端有关,但这不是重点)在当前运行的计算完成时为它们提供服务。

我的问题是客户端并不总是等待足够长的时间,有时会在他们端超时、断开连接并尝试使用不同的服务器(好吧,他们再次尝试并遇到了 elb,通常会得到不同的实例) .此外,有时 Web 客户端要求的计算会由于外部因素而变得过时,并且 Web 客户端将被终止。

在那些情况下,我真的希望能够在我从堆栈中拉出下一个请求并开始(昂贵的)计算之前检测到 Web 客户端已经消失。不幸的是,我对 snap 的体验让我相信在该框架中没有办法询问“客户端的 TCP 连接是否仍然连接?”而且我还没有找到涵盖“客户端断开连接”案例的其他 Web 框架的任何文档。

那么有没有Haskell web框架可以很方便的检测web客户端是否断开连接呢?或者如果做不到这一点,是否至少有一种方法可以使之成为可能?

(我理解可能无法在所有情况下都绝对确定 TCP 客户端是否还没有向另一端发送数据;但是,当客户端实际向服务器和服务器的框架发送 RST 数据包时不让应用程序代码确定连接已经消失,这是一个问题)


顺便说一下,尽管有人可能会怀疑 warp's onClose 处理程序会让您执行此操作,仅当响应准备就绪并写入客户端时才会触发,因此作为中止正在进行的计算的一种方式是无用的。似乎也没有办法访问接受的套接字以设置 SO_KEEPALIVE 或类似的。 (有方法可以访问初始监听套接字,但不能访问接受的套接字)

最佳答案

所以我找到了一个对我有用的答案,它可能对其他人也有用。

事实证明,您实际上可以充分利用 Warp 的内部结构来执行此操作,但是您剩下的是 Warp 的基本版本,如果您需要诸如日志记录之类的东西,将需要在上面添加其他包。

此外,请注意所谓的“半关闭”连接(当客户端关闭其发送端,但仍在等待数据时)将被检测为已关闭,从而中断您的计算。我不知道有任何处理半关闭连接的 HTTP 客户端,但只是需要注意一些事情。

无论如何,我所做的是首先复制 Network.Wai.Handler.Warp 公开的函数 runSettingsrunSettingsSocket Network.Wai.Handler.Warp.Internal 并制作了调用我提供的函数而不是 WarpI.socketConnection 的版本,这样我就有了签名:

runSettings' :: Warp.Settings -> (Socket -> IO (IO WarpI.Connection))
             -> Wai.Application -> IO ()

这需要复制一些辅助方法,例如 setSocketCloseOnExecwindowsThreadBlockHack。双 IO 签名可能看起来很奇怪,但这正是您想要的 - 外部 IO 在主线程中运行(调用 accept ) 并且内部 IOaccept 返回后 fork 的每个连接线程中运行。原始的Warp函数runSettings等同于:

\set -> runSettings' set (WarpI.socketConnection >=> return . return)

然后我做了:

data ClientDisappeared = ClientDisappeared deriving (Show, Eq, Enum, Ord)
instance Exception ClientDisappeared

runSettingsSignalDisconnect :: Warp.Settings -> Wai.Application -> IO ()
runSettingsSignalDisconnect set =
  runSettings' set (WarpI.socketConnection >=> return . wrapConn)
  where
    -- Fork a 'monitor' thread that does nothing but attempt to
    -- perform a read from conn in a loop 1/sec, and wrap the receive
    -- methods on conn so that they first consume from the stuff read
    -- by the monitoring thread. If the monitoring thread sees
    -- end-of-file (signaled by an empty string read), raise
    -- ClientDisappered on the per-connection thread.
    wrapConn conn = do
      tid <- myThreadId
      nxtBstr <- newEmptyMVar :: IO (MVar ByteString)
      semaphore <- newMVar ()
      readerCount <- newIORef (0 :: Int)
      monitorThread <- forkIO (monitor tid nxtBstr semaphore readerCount)
      return $ conn {
        WarpI.connClose = throwTo monitorThread ClientDisappeared
                          >> WarpI.connClose conn
        , WarpI.connRecv = newRecv nxtBstr semaphore readerCount
        , WarpI.connRecvBuf = newRecvBuf nxtBstr semaphore readerCount
        }
      where
        newRecv :: MVar ByteString -> MVar () -> IORef Int
                -> IO ByteString
        newRecv nxtBstr sem readerCount =
          bracket_
          (atomicModifyIORef' readerCount $ \x -> (succ x, ()))
          (atomicModifyIORef' readerCount $ \x -> (pred x, ()))
          (withMVar sem $ \_ -> do w <- tryTakeMVar nxtBstr
                                   case w of
                                     Just w' -> return w'
                                     Nothing -> WarpI.connRecv conn
          )

        newRecvBuf :: MVar ByteString -> MVar () -> IORef Int
                   -> WarpI.Buffer -> WarpI.BufSize -> IO Bool
        newRecvBuf nxtBstr sem readerCount buf bufSize =
          bracket_
          (atomicModifyIORef' readerCount $ \x -> (succ x, ()))
          (atomicModifyIORef' readerCount $ \x -> (pred x, ()))
          (withMVar sem $ \_ -> do
              (fulfilled, buf', bufSize') <-
                if bufSize == 0 then return (False, buf, bufSize)
                else
                  do w <- tryTakeMVar nxtBstr
                     case w of
                       Nothing -> return (False, buf, bufSize)
                       Just w' -> do
                         let wlen = B.length w'
                         if wlen > bufSize
                           then do BU.unsafeUseAsCString w' $ \cw' ->
                                     copyBytes buf (castPtr cw') bufSize
                                   putMVar nxtBstr (B.drop bufSize w')
                                   return (True, buf, 0)
                           else do BU.unsafeUseAsCString w' $ \cw' ->
                                     copyBytes buf (castPtr cw') wlen
                                   return (wlen == bufSize, plusPtr buf wlen,
                                           bufSize - wlen)
              if fulfilled then return True
                else WarpI.connRecvBuf conn buf' bufSize'
          )
        dropClientDisappeared :: ClientDisappeared -> IO ()
        dropClientDisappeared _ = return ()
        monitor tid nxtBstr sem st =
          catch (monitor' tid nxtBstr sem st) dropClientDisappeared

        monitor' tid nxtBstr sem st = do
          (hitEOF, readerCount) <- withMVar sem $ \_ -> do
            w <- tryTakeMVar nxtBstr
            case w of
              -- No one picked up our bytestring from last time
              Just w' -> putMVar nxtBstr w' >> return (False, 0)
              Nothing -> do
                w <- WarpI.connRecv conn
                putMVar nxtBstr w
                readerCount <- readIORef st
                return (B.null w, readerCount)
          if hitEOF && (readerCount == 0)
            -- Don't signal if main thread is also trying to read -
            -- in that case, main thread will see EOF directly
            then throwTo tid ClientDisappeared
            else do threadDelay oneSecondInMicros
                    monitor' tid nxtBstr sem st
        oneSecondInMicros = 1000000

关于haskell - 在 Haskell 中,如何在 Web 客户端断开连接时中止计算,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46041262/

相关文章:

haskell - gtk 和 gtk2 之间的区别

haskell - ghc 如何知道要使用 fmap 等的哪个定义?

Python线程错误

mysql - 如何使用 Snap 的 MysqlSimple snaplet 连接到 MySQL

web-services - 使用 Snap Web Framework 为 Heist 模板添加 OnLoad Hook

haskell - 为什么 GHC 提示非详尽的模式?

haskell - 以惯用方式自动重新连接 Haskell 网络连接

java - C#-服务器和 Java-客户端 : TCP Socket Communication Issues

适用于 Windows 和 Linux 的 c++ xml、tcp/ip 库

haskell - 在 snap 中使用 reader monad(或者在 snap 中使用 monad 转换器)