haskell - Conduit - 分派(dispatch)到多个输出文件

标签 haskell conduit

我正在尝试将项目从管道分派(dispatch)到许多输出文件中,问题与 Conduit - Multiple output file within the pipeline 非常相似,有一些差异:

  • 在前面的解决方案中,每个接收器都有一个过滤器,用于决定元素是否属于该接收器。在我的例子中,来自上游的每个元素都精确地发送到一个文件,并且在存在大量文件的情况下,最好只进行一次操作来决定它发送到哪个文件。

  • 文件是根据需要创建的。 “选择器”函数决定下一个元素将到达哪个接收器,如果尚不存在,则使用“创建新接收器”函数创建它。

例如,如果源输出:8 4 7 1 5
接收器选择器是模块 3,则操作顺序为:

Create file 2
Add 8 to file 2
Create file 1
Add 4 to file 1
Add 7 to file 1
Add 1 to file 1
Add 5 to file 2

我正在考虑这样的调度程序类型:

        dispatcherSink_ :: (Monad m) => 
                           (a -> k) ->               -- sink selector
                           (k -> Sink a m ()) ->     -- new sink
                           Sink a m ()

我尝试使用 evalStateC 和包含接收器映射的内部 StateT 来编写该函数,但我无法绑定(bind)类型。我不确定您是否可以使用同一个水槽两次。

我想做的事情可能吗?

我仍然是 Haskell 的新手,因此我们将不胜感激。

已编辑

我虽然可以创建 ResumableSinks 的 map ,Hackage 中有一个库,但它依赖于旧的且非常特定的 Conduit 版本,因此 cabal 无法安装它。 最后我没有找到一种方法可以用以前的类型编写函数,能够与任何接收器一起工作,所以我想出了一个直接与文件一起工作的函数:

import System.IO (hClose,openFile,IOMode(WriteMode))
import Conduit
import Data.IOData
import qualified Data.Foldable as F
import qualified Data.Map.Strict as M
import Control.Monad.State.Strict
import Data.ByteString.Char8 (pack)


fileDispatcherSink ::
  (MonadIO m, IOData c,Ord k) =>
  (a -> k) ->
  (a -> c) ->
  (k -> FilePath) ->
  Sink a m ()
fileDispatcherSink selector toChunked path =
  evalStateC M.empty $ dispatcher 
  where 
    dispatcher = do
      next  <- await
      m <- get
      case next of
        Nothing -> liftIO $ F.traverse_ hClose m
        Just a -> do
          let k = selector a
          h <- case M.lookup k m of
                Nothing -> do
                  nh <- liftIO $ openFile (path k) WriteMode
                  put $ M.insert k nh m
                  return nh
                Just h -> return h
          yield (toChunked a) $$ sinkHandle h
          dispatcher

testSource :: (Monad m) => Source m Int
testSource = yieldMany [8, 4, 7, 1, 5]

main :: IO ()
main = testSource
       $$ fileDispatcherSink (`mod` 3) (pack . show) ((++ ".txt") . show)

有没有办法编写_dispatcherSink__函数?

最佳答案

实现时存在概念问题

dispatcherSink_ :: (Monad m) => 
                   (a -> k) ->               -- sink selector
                   (k -> Sink a m ()) ->     -- new sink
                   Sink a m ()

。在管道中,数据从上游拉到下游,而不是推送。因此,Sink 决定是否从其上游管道请求下一个输入值。因此,您无法真正拥有一个 Sink 映射,读取输入值,然后将其提供给其中一个 Sink。您选择的Sink可能不会决定读取输入值,它可能会决定完成,然后您将如何处理输入值?您可以为该键创建一个新接收器,但它也可以决定不接受输入。

因此,您很可能需要一些不同的概念,而不是Sink,您可以向其中推送值以及可以最终确定的内容。一个想法(未经测试):

data PushSink m i = PushSink { psPush :: i -> m (PushSink m i)
                             , psFinalize :: m () }

写入文件的实现会打开一个文件,保留句柄,而 psPush 只会将一个 block 写入文件,返回相同的对象,而 psFinalize 会关闭文件。

然后你可以实现这样的变体

dispatcherSink_ :: (Monad m) => 
                   (a -> k) ->                 -- sink selector
                   (k -> m (PushSink a m)) ->  -- new sink
                   Sink a m ()

它将值推送到 PushSink 并在没有输入时最终确定它们。

关于haskell - Conduit - 分派(dispatch)到多个输出文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25046000/

相关文章:

haskell - 使用 Haskell 的 zip-conduit 从 zip 存档中的文件中读取行

haskell - 分割导管链

haskell - 有没有一种简单的方法可以将派生 Enum 的数据类型转换为列表?

haskell - 在 Haskell 中将整数输出到标准输出

networking - Http-Conduit 频繁连接失败

exception - 在 MonadResource 实例中捕获 IO 异常

haskell - 使用 HTTP 管道时如何干净地处理所有可能的错误?

debugging - :type in ghci in a ghc source file? 最接近的等价物是多少

c - withArray 与 newArray

haskell - 具有超时的并发 Haskell 操作