现在我正在开发一种以太网数据包处理库。 基本思想是数据包有两种不同的来源: 网络接口(interface)和 pcap 转储文件。数据包应该分组 按流,应过滤流,UDP流应由一个处理 方式,TCP方式等。我开发了没有管道的版本 但我发现现在有太多重复的代码,什么时候 我试图抽象,然后发明类似于管道的东西。 所以我尝试切换到导管,然后卡住了。
所以图片是这样的:
[UDP processing]
[pcap source] | / \
|---[flow map]-->* *->[dump to many files]
| \ /
[iface source] | [TCP processing]
第一个问题是流程图。应该积累 流以及流中的数据包何时多于 某个阈值 - 将其传递给处理。
第二个问题是我想要不同的 用于 UDP 和 TCP 处理的管道,因此管道应该 以某种方式 split 。
还有一个问题,所有这些东西应该是 多线程,所以生产者和消费者应该 在不同的线程中。
那么这张图中的导管应该是什么?
消息来源就是消息来源,这一点很明确。但应该是什么 流程图?一个水槽,为进一步的生产提供源泉 加工?流量数量巨大,所以不断积累 在进一步处理之前,内存中的所有数据包必须 避免。
有什么想法吗?同样,很清楚如何在没有 导管,所以问题是如何正确地设计它们。
更新。
data FlowFrame = FlowFrame { flowKey :: !F.FlowKey
, flowFrame :: [Packet]
}
data FlowState
flowFrames :: MonadIO m => Conduit Packet m FlowFrame
flowFrames = awaitForever $ \p -> do
let (Right (l3, _)) = runGet F.readL3Headers (pktData p)
let fk = F.flowKey l3
yield (FlowFrame fk [p])
sinkPrintFlow :: MonadIO m => Consumer FlowFrame m ()
sinkPrintFlow = awaitForever $ liftIO.putStrLn.show.pPrint.flowKey
isA :: F.Protocol -> FlowFrame -> Bool
isA p frame =
case ((flowKey frame)) of
F.FlowKey p _ _ -> True
_ -> False
processUDP :: MonadIO m => Conduit FlowFrame m FlowFrame
processUDP = CL.filter (isA F.PROTO_UDP)
processTCP :: MonadIO m => Conduit FlowFrame m FlowFrame
processTCP = CL.filter (isA F.PROTO_TCP)
main = do
(file:_) <- getArgs
input <- openOffline file
sourcePcap input
$$ flowFrames =$= void (sequenceConduits [processUDP, processTCP])
$= sinkPrintFlow
putStrLn "done"
最佳答案
如果您使用管道
,那么您可以使用Pipes.Extras 中的(+++)
组合器并排运行两个管道。它有这样的类型:
(+++)
:: Monad m
=> Pipe a c m r
-> Pipe b d m r
-> Pipe (Either a b) (Either c d) m r
那么你的程序将变成:
producer >-> (udpPipe +++ tcpPipe) >-> consumer
每次您希望生产者将值转发到 udpPipe
时,您都将该值包装在 Left
中,并且每次您希望将值转发到 >tcpPipe
将值包装在 Right
中。然后,下游消费者可以对其输入进行模式匹配,以判断它来自哪个管道。 Left
值来自 udpPipe
,Right
值来自 tcpPipe
。
编辑:请注意,这不需要并发。 (+++)
接受两个单线程管道并返回一个结合了它们逻辑的新单线程管道。
关于haskell - 在导管顶部设计一个库,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24400806/