我正在尝试将项目从管道分派(dispatch)到许多输出文件中,问题与 Conduit - Multiple output file within the pipeline 非常相似,有一些差异:
在前面的解决方案中,每个接收器都有一个过滤器,用于决定元素是否属于该接收器。在我的例子中,来自上游的每个元素都精确地发送到一个文件,并且在存在大量文件的情况下,最好只进行一次操作来决定它发送到哪个文件。
里>文件是根据需要创建的。 “选择器”函数决定下一个元素将到达哪个接收器,如果尚不存在,则使用“创建新接收器”函数创建它。
例如,如果源输出:8 4 7 1 5
接收器选择器是模块 3,则操作顺序为:
Create file 2
Add 8 to file 2
Create file 1
Add 4 to file 1
Add 7 to file 1
Add 1 to file 1
Add 5 to file 2
我正在考虑这样的调度程序类型:
dispatcherSink_ :: (Monad m) =>
(a -> k) -> -- sink selector
(k -> Sink a m ()) -> -- new sink
Sink a m ()
我尝试使用 evalStateC 和包含接收器映射的内部 StateT 来编写该函数,但我无法绑定(bind)类型。我不确定您是否可以使用同一个水槽两次。
我想做的事情可能吗?
我仍然是 Haskell 的新手,因此我们将不胜感激。
已编辑
我虽然可以创建 ResumableSinks 的 map ,Hackage 中有一个库,但它依赖于旧的且非常特定的 Conduit 版本,因此 cabal 无法安装它。 最后我没有找到一种方法可以用以前的类型编写函数,能够与任何接收器一起工作,所以我想出了一个直接与文件一起工作的函数:
import System.IO (hClose,openFile,IOMode(WriteMode))
import Conduit
import Data.IOData
import qualified Data.Foldable as F
import qualified Data.Map.Strict as M
import Control.Monad.State.Strict
import Data.ByteString.Char8 (pack)
fileDispatcherSink ::
(MonadIO m, IOData c,Ord k) =>
(a -> k) ->
(a -> c) ->
(k -> FilePath) ->
Sink a m ()
fileDispatcherSink selector toChunked path =
evalStateC M.empty $ dispatcher
where
dispatcher = do
next <- await
m <- get
case next of
Nothing -> liftIO $ F.traverse_ hClose m
Just a -> do
let k = selector a
h <- case M.lookup k m of
Nothing -> do
nh <- liftIO $ openFile (path k) WriteMode
put $ M.insert k nh m
return nh
Just h -> return h
yield (toChunked a) $$ sinkHandle h
dispatcher
testSource :: (Monad m) => Source m Int
testSource = yieldMany [8, 4, 7, 1, 5]
main :: IO ()
main = testSource
$$ fileDispatcherSink (`mod` 3) (pack . show) ((++ ".txt") . show)
有没有办法编写_dispatcherSink__函数?
最佳答案
实现时存在概念问题
dispatcherSink_ :: (Monad m) =>
(a -> k) -> -- sink selector
(k -> Sink a m ()) -> -- new sink
Sink a m ()
。在管道中,数据从上游拉到下游,而不是推送。因此,Sink
决定是否从其上游管道请求下一个输入值。因此,您无法真正拥有一个 Sink
映射,读取输入值,然后将其提供给其中一个 Sink
。您选择的Sink
可能不会决定读取输入值,它可能会决定完成,然后您将如何处理输入值?您可以为该键创建一个新接收器,但它也可以决定不接受输入。
因此,您很可能需要一些不同的概念,而不是Sink
,您可以向其中推送值以及可以最终确定的内容。一个想法(未经测试):
data PushSink m i = PushSink { psPush :: i -> m (PushSink m i)
, psFinalize :: m () }
写入文件的实现会打开一个文件,保留句柄,而 psPush
只会将一个 block 写入文件,返回相同的对象,而 psFinalize
会关闭文件。
然后你可以实现这样的变体
dispatcherSink_ :: (Monad m) =>
(a -> k) -> -- sink selector
(k -> m (PushSink a m)) -> -- new sink
Sink a m ()
它将值推送到 PushSink
并在没有输入时最终确定它们。
关于haskell - Conduit - 分派(dispatch)到多个输出文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25046000/