haskell - 使用 postgresql-simple 创建流式管道源

标签 haskell conduit postgresql-simple

postgresql-simple提供流查询的功能,例如

fold 
  :: (FromRow row, ToRow params)
  => Connection -> Query -> params -> a -> (a -> row -> IO a) -> IO a

我想创建一个充分利用流媒体的管道源。
mySource :: (FromRow row, Monad m) => Source m row

不幸的是,因为IO出现在 fold 中的逆变位置(我认为?) ,我真的很纠结这些类型。以下类型检查,但在产生值之前折叠整个流。
getConduit :: Connection -> IO (C.ConduitM () Event IO ())
getConduit conn = fold_ conn queryEventRecord CL.sourceNull foo
  where
    foo :: C.ConduitM () Event IO () -> Event -> IO (C.ConduitM () Event IO ())
    foo cond evt = pure (cond >> C.yield evt)

任何关于如何实现这一点的指针将不胜感激!谢谢!

最佳答案

一种(不太好)的方法来解决这个问题

  • 做一个新的 TMChan 接收行
  • 套装 foreach_ 将行转储到此 channel
  • 最后使用 stm-conduit 从 channel 中制作源

  • 我没有办法立即测试这个,但以下应该有效
    import Conduit
    import Database.PostgreSQL.Simple (foreach_)
    import Data.Conduit.TMChan (sourceTMChan)
    import Control.Concurrent.STM.TMChan (newTMChanIO, writeTMChan, atomically)
    
    mySource :: (FromRow row, MonadIO m) => Connection -> Query -> IO (Source m row)
    mySource connection query = do
      chan <- newTMChanIO
      forEach_ connection query (atomically . writeTMChan chan)
      pure (sourceTMChan chan)
    

    如果我们有 forEach_ :: (MonadIO m, FromRow r) => Connection -> Query -> (r -> m ()) -> m ()这可能更容易...

    关于haskell - 使用 postgresql-simple 创建流式管道源,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41643485/

    相关文章:

    function - 为什么这些褶皱停在头/尾?

    list - 是否 `(' a' :_)` represent a tuple or a list?

    haskell - 函数类型签名中的右结合性

    haskell - 如何在不使用记录语法的情况下实现 MonadState 类?

    haskell 管道 : having a Sink return a value based on the values from upstream

    haskell - 使用网络管道时如何刷新网络流?

    Haskell 导管 : is it possible to optionally have the result of a source?

    PostgreSQL Simple 在一个元组中最多只支持 10 个变量,但我需要更多