注:如果你还没有偶然发现异步电路,阅读我的另外两篇关于 Stack Overflow 的帖子(除了 John Hughes' article 他写下它们的地方)可能会非常方便:"An ArrowCircuit instance for stream processors which could block" , "Hughes' Fibonacci stream" .
John Hughes 在他著名的 "Generalising Monads to Arrows" 中为异步电路提出了以下类型。 :
data StreamProcessor a b = Get (a -> StreamProcessor a b) |
Put b (StreamProcessor a b) |
Halt
instance Category StreamProcessor where
id = Get (\ x -> Put x id)
Put c bc . ab = Put c (bc . ab) {- 1 -}
Get bbc . Put b ab = (bbc b) . ab {- 2 -}
Get bbc . Get aab = Get $ \ a -> (Get bbc) . (aab a) {- 3 -}
Get bbc . Halt = Halt {- 4 -}
Halt . ab = Halt {- 5 -}
bypass :: [b] -> [d] -> StreamProcessor b c -> StreamProcessor (b, d) (c, d)
bypass [] ds (Get f) = Get $ \ ~(b, d) -> bypass [] (ds ++ [d]) (f b)
bypass (b : bs) [] (Get f) = bypass bs [] (f b)
bypass [] (d : ds) (Put c sp) = Put (c, d) $ bypass [] ds sp
bypass bs [] (Put c sp) =
Get $ \ ~(b, d) -> Put (c, d) (bypass (bs ++ [b]) [] sp)
bypass bs ds Halt = Halt
instance Arrow StreamProcessor where
arr f = Get $ \ a -> Put (f a) (arr f)
first = bypass [] []
instance ArrowChoice StreamProcessor where
left (Put b ab) = Put (Left b) (left ab)
left (Get aab) = Get $ \ a -> case a of
Left a' -> (left . aab) a'
Right d -> Put (Right d) (left $ Get aab)
left Halt = Halt
在他的另一篇论文 ( "Programming with arrows" ) 中,他写下了一个适用于箭头的组合器(好吧,ArrowChoice
s),如 mapM
为 Monads
:biteOff :: [a] -> Either [a] (a, [a])
biteOff [] = Left []
biteOff (x : xs) = Right (x, xs)
mapA :: ArrowChoice k => k a b -> k [a] [b]
mapA k = let
go :: ArrowChoice k => k a b -> k (a, [a]) [b]
go k = k *** mapA k >>^ uncurry (:)
in arr biteOff >>> (arr (const []) ||| go k)
长话短说,它不适用于 StreamProcessor
s:GHCi> Put x sp = Put [1, 2, 3] Halt >>> mapA id
GHCi> x
*** Exception: stack overflow
GHCi> Put x sp = Put [] Halt >>> mapA id
GHCi> x
*** Exception: stack overflow
有趣的是,不能决定的不仅仅是值,还有构造函数本身:GHCi> Get f = Put [1, 2, 3] Halt >>> mapA id
GHCi> Put x sp = f ()
GHCi> x
*** Exception: stack overflow
所以,这就是我如何理解正在发生的事情:StreamProcessor
有相当繁琐的构图规则。要组合两个箭头,我们通常必须知道两个构造函数。因此,当我们跌跌撞撞地进入一个有组合的无穷级数时,我们只希望{- 1 -}
st 和 {- 5 -}
(.)
的规则只会工作。我们不是很幸运:mapA
= arr biteOff >>> (arr (const []) ||| go k)
.arr biteOff >>> (arr (const []) ||| go k)
= (arr (const []) ||| go k) . arr biteOff
.arr (const []) ||| go k
= left (arr (const [])) >>> arr mirror >>> left (go k) >>> arr mirror >>> arr untag
.见 (|||)
, mirror
和 untag
here .>>>
是 infixr
.因此:left (arr (const [])) >>> (arr mirror >>> (left (go k) >>> (arr mirror >>> arr untag)))
.arr mirror >>> arr untag
是 Get
,因为它们都是 Get
s(规则 {- 3 -}
用于 (.)
)。left (go k) >>> (arr mirror >>> arr untag)
= (arr mirror >>> arr untag) . left (go k)
= Get ... . left (go k)
.规则来自 {- 2 -}
通过 {- 4 -}
在这里工作。可以看出,我们需要模式匹配 left (go k)
现在。浏览 left
告诉我们需要模式匹配 go k
.go k
= k *** mapA k >>^ uncurry (:)
.k *** mapA k >>^ uncurry (:)
= first k >>> arr swap >>> first (mapA k) >>> arr swap >>> arr (uncurry (:))
.first k >>> arr swap >>> first (mapA k) >>> arr swap >>> arr (uncurry (:))
= ... . (first (mapA k) . ...)
. first (mapA k)
原来是一些从右到左组合的第一个参数(因为 .
是 infixr
),因此它需要进行模式匹配。快速浏览 first
显示我们需要模式匹配 mapA k
.所以,由于这不太奏效,我想写一些
mapSP
为 StreamProcessor
只。然而,现在这似乎不是一个好主意:比如说,我们申请了
mapSP
给一些 sp
.列表[a1, a2]
是输入。 sp
阻止 a1
然后继续 a2
.我认为没有办法解决这个问题。那么,我对为什么通用
mapA
的理解是不适用于 StreamProcessor
对吗?有没有mapSP :: StreamProcessor a b -> StreamProcessor [a] [b]
这将 加起来 ,不仅仅是 类型检查 ?
最佳答案
我认为您不会收到 mapA
和你想要的一样一般,但我也不确定你为什么不正确的推理。在我看来,原因 mapA
不能工作是因为 StreamProcessor
造型严谨,mapA
因为它的定义要求它的形状是惰性的。另一方面,您关于一个部分阻塞而另一个部分继续的论点实际上很好。
要了解为什么这个“长度为 2 的列表”示例真的不是问题,我们只需查看 bypass
.毕竟mapA f
专门用于长度为 2 的列表无异于:
mapA2 :: Arrow k => k a b -> k [a] [b] -- These lists must be length 2!
mapA2 f = arr (\[a,b] -> (a,b)) >>> (f *** f) >>> arr (\(a,b) -> [a,b])
现在,让我们构建一对 StreamProcessor
s,那是一个 Get
还有一个是 Put
:putOnes :: StreamProcessor a Int
putOnes = Put 1 putOnes
sumTwo :: StreamProcessor Int Int
sumTwo = Get $ \x -> Get $ \y -> Put (x+y) sumTwo
并且将它们与***
结合起来也没有问题。 :> runStreamProcessor (putOnes *** sumTwo) (zip (repeat 1) [1..10])
[(1,3),(1,7),(1,11),(1,15),(1,19)]
如果这没问题,那么显然 mapA2
有效,这意味着 mapA
不是因为Get
惹麻烦s 和 Put
s 并不总是匹配。 mapA
只需要做一些同步。那么应该如何
mapA
工作?让我们考虑一下写 mapA putOnes
意味着什么.有人可能认为它应该只是一个 Put
,特别是考虑到 putOnes
仅包含 Put
s。但是,如果它没有 Get
,而不是包含在其中的内容 Put
?好像不管输入,输出到runStreamProcessor (mapA putOnes) anyOldList
将是一个无限列表的无限列表!这绝对不是 mapA
想要的行为,但那是因为 mapA
仅打算与同步信号功能一起运行。在同步的情况下,总是有一个输入到一个输出,这意味着我们道德上总是从 Get
开始。 , 任何超出输入列表长度的信号函数都可以安全地忽略。这给我们留下了两个“明智”选择
mapSP
.***
并将列表编写为嵌套对(很像我在上面的 mapA2
中所做的)。 mapSP
来强制系统同步。总是以 Get
开头并使用输入列表的长度来确定要做什么。从那里,我们继续缓存输入,就像 bypass
作品:mapSP :: StreamProcessor a b -> StreamProcessor [a] [b]
mapSP sp = go (repeat (sp, []))
where
-- We keep track of each SP along with its own personal queue of inputs.
-- We always start with a Get, to determine how many outputs to produce.
go sps = Get $ \as -> stepPuts (zipWith addA as sps) (drop (length as) sps)
addA a (s, lst) = (s, lst++[a])
stepPuts sps rest = case stepPuts' sps of
Just (unzip -> (xs, sps')) -> Put xs $ stepPuts sps' rest
Nothing -> go (sps ++ rest)
stepPuts' [] = Just []
stepPuts' ((Get f, a:as):rest) = stepPuts' ((f a, as) : rest)
stepPuts' ((Put x s, as):rest) = ((x, (s, as)):) <$> stepPuts' rest
stepPuts' _ = Nothing
请注意,在如何编写这个方面有很多开放的选择,其中大部分归结为我们想要保持多少异步。我们可以通过几个例子来探索这一点。让我们从 Hughes 的论文中提取一个开始:> runStreamProcessor (mapSP (delay 0)) [[1,2,3],[4,5],[6],[7,8],[9,10,11],[12,13,14,15]]
[[0,0,0],[1,2,3],[4,5],[6],[7,8],[9,10,11],[12,13,14,0]]
嗯,在 Hughes 的论文中,mapA (delay 0)
充当“列延迟器”,生成类似 [[0,0,0],[1,2],[4],[6,5],[7,8,3],[9,10,11,0]]
的列表,但是这个 mapSP
没有。为什么?这是因为stepPuts
中的第一个案例,它告诉我们重复调用 stepPuts
再次。这里的想法是,如果我们有一个 StreamProcessor
每个输入产生多个输出,也许我们不想要 mapSP
延迟这些输出。例如:putTwice :: StreamProcessor a a
putTwice = Get $ \x -> Put x $ Put x putTwice
> runStreamProcessor (mapSP putTwice) [[1,2,3],[4,5],[6],[7,8],[9,10,11],[12,13,14,15]]
[[1,2,3],[1,2,3],[4,5],[4,5],[6],[6],[7,8],[7,8],[9,10,11],[9,10,11],[12,13,14,15],[12,13,14,15]]
如果我们想要一个更同步的语义,我们可以改变 stepPuts
的定义。到: stepPuts sps rest = case stepPuts' sps of
Just (unzip -> (xs, sps')) -> Put xs $ go (sps' ++ rest)
-- Just (unzip -> (xs, sps')) -> Put xs $ stepPuts sps' rest
Nothing -> go (sps ++ rest)
现在,我们恢复了像 Hughes 更同步的东西 mapA
语义:> runStreamProcessor (mapSP (delay 0)) [[1,2,3],[4,5],[6],[7,8],[9,10,11],[12,13,14,15]]
[[0,0,0],[1,2],[4],[6,5],[7,8,3],[9,10,11,0]]
> runStreamProcessor (mapSP putTwice) [[1,2,3],[4,5],[6],[7,8],[9,10,11],[12,13,14,15]]
[[1,2,3],[1,2],[4],[4,5],[6,5,3],[6,8,11,15]]
关于haskell - mapA 用于可能阻塞的流处理器(异步电路),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65732411/