这个问题是关于 Haskell 库的 Pipes .
此问题与2019 Advent of Code有关Day 11 (可能剧透警告)
我有两个 Pipe Int Int m r
brain
和 robot
需要在连续循环中相互传递信息。那就是 brain
的输出需要去robot
的输入和 robot
的输出需要去brain
的输入.当brain
完成了我需要计算的结果。
我如何作曲 brain
和 robot
进入循环?理想情况下是类型为 Effect m r
的循环我可以传递给 runEffect
编辑:结果应该是这样的:
+-----------+ +-----------+
| | | |
| | | |
a ==> f ==> b ==> g ==> a=|
^ | | | | |
| | | | | | | |
| +-----|-----+ +-----|-----+ |
| v v |
| () r |
+=====================================+
最佳答案
答案
最简单的解决方案是使用 Client
和 Server
正如 danidiaz 在评论中建议的那样,因为 pipes
没有任何对循环管道的内置支持,如果正确地这样做的话,这将是非常困难的。这主要是因为我们需要处理await
数量的情况。 s 与 yield
的数量不匹配s。
编辑:我添加了一个关于其他答案问题的部分。请参阅“另一个有问题的替代方案”部分
编辑 2:我在下面添加了一个问题较少的可能解决方案。请参阅“可能的解决方案”部分
有问题的替代方案
然而,可以在 Proxy
的帮助下模拟它。框架(带有 Client
和 Server
)和简洁的功能 generalize
,它变成了一个单向 Pipe
成双向Proxy
.
generalize f x0
+-----------+ +---------------------+
| | | |
| | x <======================== x
a ==> f ==> b becomes | |
| | a ==> f ==> b
| | | | |
+-----|-----+ +----------|----------+
v v
r r
现在我们可以使用
//>
和 >\\
塞住末端并使流动循环:loop :: Monad m => Pipe a a m r -> a -> Effect m r
loop p x0 = pure >\\ generalize p x0 //> pure
有这个形状
loop f
a
+-----|-----+
| | |
/====<=======/===<========\
| | | |
\=> a ==> f ==> a ==/
| |
+-----|-----+
v
r
如您所见,我们需要为
a
输入一个初始值。 .这是因为无法保证管道不会await
在它屈服之前,这将迫使它永远等待。但请注意,此 会扔掉数据如果管道
yield
s 之前多次 await
ing,因为 generalize 是使用 state monad 在内部实现的,该状态 monad 在产生时保存最后一个值并在等待时检索最后一个值。用法(有问题的想法)
要将它与您的管道一起使用,只需将它们组合起来并将它们交给
loop
:runEffect $ loop (f >-> g)
但是请不要使用它,因为如果您不小心它会随机丢弃数据
另一个有问题的选择
你也可以像mingmingrr建议的那样制作一个懒惰的无限管道链
infiniteChain :: Functor m => Pipe a a m r -> Producer a m r
infiniteChain f = infiniteChain >-> f
这解决了丢弃/重复值的问题,但还有其他几个问题。首先,在让步之前先等待会导致无限循环使用无限内存,但这已经在 mingmingrr 的回答中解决了。
另一个更难解决的问题是,在相应的 yield 之前的每个 action 都会为每个 await 重复一次。如果我们修改他们的示例以记录正在发生的事情,我们可以看到这一点:
import Pipes
import qualified Pipes.Prelude as P
f :: Monad m => Pipe Int Int m r
f = P.map (* 2)
g :: Monad m => Int -> Pipe Int Int m ()
g 0 = return ()
g n = do
lift . putStrLn $ "Awaiting. n = " ++ show n
x <- await
lift . putStrLn $ "Got: x = " ++ show x ++ " and n = "++ show n ;
yield (x + 1)
g (n - 1)
cyclic' :: Monad m => Int -> Producer Int m Int
cyclic' input = let pipe = (yield input >> pipe) >-> f >-> g 6 in pipe
现在,正在运行
runEffect (cyclic' 0 >-> P.print)
将打印以下内容:Awaiting. n = 6
Got: x = 0 and n = 6
1
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
7
Awaiting. n = 3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
Got: x = 14 and n = 3
15
Awaiting. n = 2
Awaiting. n = 3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
Got: x = 14 and n = 3
Got: x = 30 and n = 2
31
Awaiting. n = 1
Awaiting. n = 2
Awaiting. n = 3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
Got: x = 14 and n = 3
Got: x = 30 and n = 2
Got: x = 62 and n = 1
63
如您所见,对于每个
await
,我们重新执行了一切,直到对应的 yield
.更具体地说,等待触发管道的新副本运行,直到达到产量。当我们再次等待时,副本将运行直到下一次再次产生并且如果它触发 await
在此期间,它将创建另一个副本并运行它直到第一个产量,依此类推。这意味着在最好的情况下,我们得到
O(n^2)
而不是线性性能(并使用 O(n)
而不是 O(1)
内存),因为我们正在为每个 Action 重复所有内容。在最坏的情况下,例如如果我们正在读取或写入文件,我们可能会得到完全错误的结果,因为我们正在重复副作用。一个可能的解决方案
如果你真的必须使用
Pipe
s 并且不能使用 request
/respond
相反,您确信您的代码永远不会await
超过(或之前)它yield
s(或者在这些情况下有一个很好的默认值),我们可以建立在我之前的上述尝试的基础上,制定一个解决方案,至少可以处理 yield
时的情况。比你多await
.诀窍是为
generalize
的实现添加一个缓冲区。 ,所以多余的值会被存储而不是被丢弃。我们还可以将额外参数保留为缓冲区为空时的默认值。import Pipes.Lift (evalStateP)
import Control.Monad.Trans.State.Strict (state, modify)
import qualified Data.Sequence
generalize' :: Monad m => Pipe a b m r -> x -> Proxy x a x b m r
generalize' p x0 = evalStateP Seq.empty $ up >\\ hoist lift p //> dn
where
up () = do
x <- lift $ state (takeHeadDef x0)
request x
dn a = do
x <- respond a
lift $ modify (Seq.|> x)
takeHeadDef :: a -> Seq.Seq a -> (a, Seq.Seq a)
takeHeadDef x0 xs = (foldr const x0 xs, Seq.drop 1 xs)
如果我们现在将其插入我们对
loop
的定义中,我们将解决丢弃多余值的问题(以保持缓冲区的内存为代价)。它还可以防止复制除默认值之外的任何值,并且仅在缓冲区为空时使用默认值。loop' :: Monad m => a -> Pipe a a m r -> Effect m r
loop' x0 p = pure >\\ generalize' p x0 //> pure
如果我们想要
await
之前yield
如果是错误,我们可以简单地给出 error
作为我们的默认值:loop' (error "Await without yield") somePipe
.TL; 博士
使用
Client
和 Server
来自 Pipes.Core
.它将解决您的问题,而不会导致大量奇怪的错误。如果这是不可能的,我的“可能的解决方案”部分带有
generalize
的修改版本|在大多数情况下应该完成这项工作。
关于haskell - 在haskell中将管道组合成一个循环或循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59280156/