haskell - mapA 用于可能阻塞的流处理器(异步电路)

标签 haskell frp arrows

注:如果你还没有偶然发现异步电路,阅读我的另外两篇关于 Stack Overflow 的帖子(除了 John Hughes' article 他写下它们的地方)可能会非常方便:"An ArrowCircuit instance for stream processors which could block" , "Hughes' Fibonacci stream" .

John Hughes 在他著名的 "Generalising Monads to Arrows" 中为异步电路提出了以下类型。 :

data StreamProcessor a b = Get (a -> StreamProcessor a b) | 
                           Put b    (StreamProcessor a b) |
                           Halt

instance Category StreamProcessor where
    id = Get (\ x -> Put x id)
  
    Put c bc . ab = Put c (bc . ab)                          {- 1 -}
    Get bbc . Put b ab = (bbc b) . ab                        {- 2 -}
    Get bbc . Get aab = Get $ \ a -> (Get bbc) . (aab a)     {- 3 -}
    Get bbc . Halt = Halt                                    {- 4 -}
    Halt . ab = Halt                                         {- 5 -}

bypass :: [b] -> [d] -> StreamProcessor b c -> StreamProcessor (b, d) (c, d)
bypass [] ds (Get f)          = Get $ \ ~(b, d) -> bypass [] (ds ++ [d]) (f b)
bypass (b : bs) [] (Get f)    = bypass bs [] (f b)
bypass [] (d : ds) (Put c sp) = Put (c, d) $ bypass [] ds sp
bypass bs [] (Put c sp) =      
  Get $ \ ~(b, d) -> Put (c, d) (bypass (bs ++ [b]) [] sp)
bypass bs ds Halt             = Halt

instance Arrow StreamProcessor where
  arr f = Get $ \ a -> Put (f a) (arr f)
  first = bypass [] []

instance ArrowChoice StreamProcessor where
    left (Put b ab) = Put (Left b) (left ab)
    left (Get aab)  = Get $ \ a -> case a of 
                                       Left a' -> (left . aab) a'
                                       Right d -> Put (Right d) (left $ Get aab)
    left Halt = Halt
在他的另一篇论文 ( "Programming with arrows" ) 中,他写下了一个适用于箭头的组合器(好吧,ArrowChoice s),如 mapMMonads :
biteOff :: [a] -> Either [a] (a, [a])
biteOff []       = Left []
biteOff (x : xs) = Right (x, xs)

mapA :: ArrowChoice k => k a b -> k [a] [b] 
mapA k = let
            go :: ArrowChoice k => k a b -> k (a, [a]) [b]
            go k = k *** mapA k >>^ uncurry (:)
         in arr biteOff >>> (arr (const []) ||| go k)
长话短说,它不适用于 StreamProcessor s:
GHCi> Put x sp = Put [1, 2, 3] Halt >>> mapA id
GHCi> x
*** Exception: stack overflow
GHCi> Put x sp = Put [] Halt >>> mapA id
GHCi> x
*** Exception: stack overflow
有趣的是,不能决定的不仅仅是值,还有构造函数本身:
GHCi> Get f = Put [1, 2, 3] Halt >>> mapA id
GHCi> Put x sp = f ()
GHCi> x
*** Exception: stack overflow
所以,这就是我如何理解正在发生的事情:StreamProcessor有相当繁琐的构图规则。要组合两个箭头,我们通常必须知道两个构造函数。因此,当我们跌跌撞撞地进入一个有组合的无穷级数时,我们只希望{- 1 -} st 和 {- 5 -} (.) 的规则只会工作。我们不是很幸运:
  • mapA = arr biteOff >>> (arr (const []) ||| go k) .
  • arr biteOff >>> (arr (const []) ||| go k) = (arr (const []) ||| go k) . arr biteOff .
  • arr (const []) ||| go k = left (arr (const [])) >>> arr mirror >>> left (go k) >>> arr mirror >>> arr untag .见 (|||) , mirroruntag here .
  • >>>infixr .因此:left (arr (const [])) >>> (arr mirror >>> (left (go k) >>> (arr mirror >>> arr untag))) .
  • arr mirror >>> arr untagGet ,因为它们都是 Get s(规则 {- 3 -} 用于 (.))。
  • left (go k) >>> (arr mirror >>> arr untag) = (arr mirror >>> arr untag) . left (go k) = Get ... . left (go k) .规则来自 {- 2 -}通过 {- 4 -}在这里工作。可以看出,我们需要模式匹配 left (go k)现在。浏览 left告诉我们需要模式匹配 go k .
  • go k = k *** mapA k >>^ uncurry (:) .
  • k *** mapA k >>^ uncurry (:) = first k >>> arr swap >>> first (mapA k) >>> arr swap >>> arr (uncurry (:)) .
  • first k >>> arr swap >>> first (mapA k) >>> arr swap >>> arr (uncurry (:)) = ... . (first (mapA k) . ...) . first (mapA k)原来是一些从右到左组合的第一个参数(因为 .infixr ),因此它需要进行模式匹配。快速浏览 first显示我们需要模式匹配 mapA k .
  • 重来。

  • 所以,由于这不太奏效,我想写一些 mapSPStreamProcessor只。然而,现在这似乎不是一个好主意:
    比如说,我们申请了 mapSP给一些 sp .列表[a1, a2]是输入。 sp阻止 a1然后继续 a2 .我认为没有办法解决这个问题。
    那么,我对为什么通用 mapA 的理解是不适用于 StreamProcessor对吗?有没有mapSP :: StreamProcessor a b -> StreamProcessor [a] [b]这将 加起来 ,不仅仅是 类型检查 ?

    最佳答案

    我认为您不会收到 mapA和你想要的一样一般,但我也不确定你为什么不正确的推理。在我看来,原因 mapA不能工作是因为 StreamProcessor造型严谨,mapA因为它的定义要求它的形状是惰性的。另一方面,您关于一个部分阻塞而另一个部分继续的论点实际上很好。
    要了解为什么这个“长度为 2 的列表”示例真的不是问题,我们只需查看 bypass .毕竟mapA f专门用于长度为 2 的列表无异于:

    mapA2 :: Arrow k => k a b -> k [a] [b] -- These lists must be length 2!
    mapA2 f = arr (\[a,b] -> (a,b)) >>> (f *** f) >>> arr (\(a,b) -> [a,b])
    
    现在,让我们构建一对 StreamProcessor s,那是一个 Get还有一个是 Put :
    putOnes :: StreamProcessor a Int
    putOnes = Put 1 putOnes
    
    sumTwo :: StreamProcessor Int Int
    sumTwo = Get $ \x -> Get $ \y -> Put (x+y) sumTwo
    
    并且将它们与*** 结合起来也没有问题。 :
    > runStreamProcessor (putOnes *** sumTwo) (zip (repeat 1) [1..10])
    [(1,3),(1,7),(1,11),(1,15),(1,19)]
    
    如果这没问题,那么显然 mapA2有效,这意味着 mapA不是因为Get惹麻烦s 和 Put s 并不总是匹配。 mapA只需要做一些同步。

    那么应该如何mapA工作?让我们考虑一下写 mapA putOnes 意味着什么.有人可能认为它应该只是一个 Put ,特别是考虑到 putOnes仅包含 Put s。但是,如果它没有 Get ,而不是包含在其中的内容 Put ?好像不管输入,输出到runStreamProcessor (mapA putOnes) anyOldList将是一个无限列表的无限列表!这绝对不是 mapA 想要的行为,但那是因为 mapA仅打算与同步信号功能一起运行。在同步的情况下,总是有一个输入到一个输出,这意味着我们道德上总是从 Get 开始。 , 任何超出输入列表长度的信号函数都可以安全地忽略。
    这给我们留下了两个“明智”选择 mapSP .
  • 首先,我们可以预先定义列表的长度。如果我们静态地知道长度,那么这等效于使用 ***并将列表编写为嵌套对(很像我在上面的 mapA2 中所做的)。
  • 其次,我们可以通过制作 mapSP 来强制系统同步。总是以 Get 开头并使用输入列表的长度来确定要做什么。从那里,我们继续缓存输入,就像 bypass作品:

  • mapSP :: StreamProcessor a b -> StreamProcessor [a] [b]
    mapSP sp = go (repeat (sp, []))
      where
        -- We keep track of each SP along with its own personal queue of inputs.
        -- We always start with a Get, to determine how many outputs to produce.
        go sps = Get $ \as -> stepPuts (zipWith addA as sps) (drop (length as) sps)
    
        addA a (s, lst) = (s, lst++[a])
    
        stepPuts sps rest = case stepPuts' sps of
          Just (unzip -> (xs, sps')) -> Put xs $ stepPuts sps' rest
          Nothing -> go (sps ++ rest)
    
        stepPuts' [] = Just []
        stepPuts' ((Get f, a:as):rest) = stepPuts' ((f a, as) : rest)
        stepPuts' ((Put x s, as):rest) = ((x, (s, as)):) <$> stepPuts' rest
        stepPuts' _ = Nothing
    
    请注意,在如何编写这个方面有很多开放的选择,其中大部分归结为我们想要保持多少异步。我们可以通过几个例子来探索这一点。让我们从 Hughes 的论文中提取一个开始:
    > runStreamProcessor (mapSP (delay 0)) [[1,2,3],[4,5],[6],[7,8],[9,10,11],[12,13,14,15]]
    [[0,0,0],[1,2,3],[4,5],[6],[7,8],[9,10,11],[12,13,14,0]]
    
    嗯,在 Hughes 的论文中,mapA (delay 0)充当“列延迟器”,生成类似 [[0,0,0],[1,2],[4],[6,5],[7,8,3],[9,10,11,0]] 的列表,但是这个 mapSP没有。为什么?这是因为stepPuts中的第一个案例,它告诉我们重复调用 stepPuts再次。这里的想法是,如果我们有一个 StreamProcessor每个输入产生多个输出,也许我们不想要 mapSP延迟这些输出。例如:
    putTwice :: StreamProcessor a a
    putTwice = Get $ \x -> Put x $ Put x putTwice
    
    > runStreamProcessor (mapSP putTwice) [[1,2,3],[4,5],[6],[7,8],[9,10,11],[12,13,14,15]]
    [[1,2,3],[1,2,3],[4,5],[4,5],[6],[6],[7,8],[7,8],[9,10,11],[9,10,11],[12,13,14,15],[12,13,14,15]]
    
    如果我们想要一个更同步的语义,我们可以改变 stepPuts 的定义。到:
        stepPuts sps rest = case stepPuts' sps of
          Just (unzip -> (xs, sps')) -> Put xs $ go (sps' ++ rest)
          -- Just (unzip -> (xs, sps')) -> Put xs $ stepPuts sps' rest
          Nothing -> go (sps ++ rest)
    
    现在,我们恢复了像 Hughes 更同步的东西 mapA语义:
    > runStreamProcessor (mapSP (delay 0)) [[1,2,3],[4,5],[6],[7,8],[9,10,11],[12,13,14,15]]
    [[0,0,0],[1,2],[4],[6,5],[7,8,3],[9,10,11,0]]
    > runStreamProcessor (mapSP putTwice) [[1,2,3],[4,5],[6],[7,8],[9,10,11],[12,13,14,15]]
    [[1,2,3],[1,2],[4],[4,5],[6,5,3],[6,8,11,15]]
    

    关于haskell - mapA 用于可能阻塞的流处理器(异步电路),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65732411/

    相关文章:

    reactive-programming - ReactiveX 被认为是响应式(Reactive)编程吗?

    haskell - 如何在 elerea 中统一两个或多个信号?

    haskell - 是 (map f) == concatMap (map f . ( :[]))?

    haskell - 小的 haskell libClang 例子?

    haskell - 在 Haskell 中使用 Dynamic/fromDynamic 时是否可以恢复约束?

    haskell - Sigma 中的限制类型

    angular - 如何在 agm-map 中制作自定义箭头标记?

    algorithm - Haskell 中全 1 的最大平方

    haskell - 为什么一些三便士-gui FRP 组合器在 MonadIO monad 上运行而不是纯粹的?

    haskell - haskell 中的箭头和函数有何不同?