haskell - 将字节流传输到网络 websocket

标签 haskell haskell-pipes

我有一个代码,使用文件句柄来模拟流式接收器 Bytestring来自来源(AWS S3)。如果我们想使用 Network.Websocket 作为水槽,交换 LBS.writeFile 就足够了吗?在下面的代码中 sendBinaryData (带有连接句柄)?

{-# LANGUAGE OverloadedStrings,ScopedTypeVariables #-}

import qualified Aws
import qualified Aws.S3 as S3
import           Data.Conduit (($$+-))
import qualified Data.Conduit.List as CL (mapM_)
import qualified Data.ByteString.Streaming.HTTP as SP
import qualified Data.ByteString.Lazy as LBS
import Streaming as S
import Streaming.Prelude as S hiding (show,print)
import Control.Concurrent.Async (async,waitCatch)
import Data.Text as T (Text)

data AwsConfig a = AwsConfig { _aws_cfg :: Aws.Configuration, _aws_s3cfg :: S3.S3Configuration a, _aws_httpmgr :: SP.Manager }

getObject :: AwsConfig Aws.NormalQuery -> T.Text -> T.Text ->  IO Int
getObject cfg bucket key = do
  req <- waitCatch =<< async (runResourceT $ do
    {- Create a request object with S3.getObject and run the request with pureAws. -}
    S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <- 
      Aws.pureAws (_aws_cfg cfg) (_aws_s3cfg cfg) (_aws_httpmgr cfg) $
        S3.getObject bucket key
    {- Stream the response to a lazy bytestring -}
    liftIO $ LBS.writeFile "testaws" LBS.empty -- this will be replaced by content-length of the bytes 
    let obj = (($$+- CL.mapM_ S.yield) . hoist lift ) (SP.responseBody rsp)
    S.mapM_ (liftIO . (LBS.appendFile "testaws") . LBS.fromStrict) obj
    return $ lookup "content-length" (S3.omUserMetadata mdata))
  case req of
    Left _ -> return 2 -- perhaps, we could use this to send an error message over websocket
    Right _ -> return 0

令我困惑的是如何确定流的终止?如果是文件,则由 writeFile 处理。 API。怎么样sendBinaryData ?它是否以与 writeFile 类似的方式处理终止?还是由客户端的数据解析器决定?

更新

这个问题是关于如何将数据流式传输到 websocket 句柄(让我们假设已经提供了句柄),就像我们在上面的示例中处理文件句柄一样,而不是关于如何在 resourceT 内管理句柄。 。 conduit似乎确实需要 mapM_ 接收器数据的方法。所以,看来这确实是一条路。

终止问题是因为我有这样的想法:如果我们有一个函数在 Websocket 句柄的另一端监听数据,那么确定消息结束似乎在流式传输中很重要语境。给定如下函数:

f :: LBS.ByteString -> a

如果我们这样做S.mapM_要将数据流式传输到 websocket 句柄,是否需要添加某种 end of stream标记以便 f在另一端监听可以停止处理惰性字节串。否则f不知道消息何时完成。

最佳答案

您认为句柄需要额外的技巧是正确的。但是,由于您已经在使用 ResourceT monad 转换器,因此这是 delightfully simple to do with allocateallocate 允许您在资源 monad 中创建一个句柄并注册一个清理操作(在您的情况下只是关闭连接)。

ok <- runResourceT $ do
  (releaseKey, handle) <-
    allocate (WebSockets.acceptRequest request) 
             (`WebSockets.sendClose` closeMessage)
  WebSockets.sendBinaryData handle data
  return ok
where
  request = ...
  closeMessage = ...
  data = ...
  ok = ...

通过使用allocate,保证在runResourceT返回ok时关闭句柄。

但是,我并不完全确定这就是您想要的。在我看来, getObject 不应该知道如何接受和关闭 WS 连接;也许它应该将 WS 连接句柄作为参数,然后写入它。如果您将其返回类型升级为 ResourceT,那么您可以将 调用者 交给 getObject,并负责调用 runResourceT并分配 WS 句柄等等。但希望上面的例子足以让您继续前进。

关于haskell - 将字节流传输到网络 websocket,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37813120/

相关文章:

haskell - 重新排序搜索空间

haskell - Maybe 从 Maybes 构建数据

haskell - 如何找出运算符 "+"的类型?

haskell - 如何将 IO 操作的输出传输到 haskell 中的进程中

haskell - 如何编写一个向下游发送它从上游接收的列表的管道?

animation - 在 Haskell 中使用光泽度制作动画

haskell - 如何静态检查图形有效性?

haskell - 在管道内运行单态消费者

haskell - 为什么 Conduit 和 Pipe 不能有 Arrow 实例?

haskell - 使用管道解析来保留 map 的剩余内容