haskell - 连接管道与返回不同值的消费者和生产者

标签 haskell haskell-pipes

我正在使用管道生态系统编写一个流函数,特别是管道并发,它基于操作库,使我能够快速制作小程序片段,我可以通过网络向服务器或向服务器发出命令。 stdin/out shell 命令,然后读回响应。在本例中它是星号,但可以概括为任何类似的东西。

我最初写这个是为了考虑管道,但它不起作用。以下代码不起作用的原因是 astPipe 返回 Pipe _ _ IO a,而来自 Pipes-concurrency 的 i 和 o 都返回 Consumer/Producer _ IO ()。我想过让 astPipe 产生 Maybe ByteString,然后让输出 Consumer 消耗 Maybe ByteString,但这仍然是没有解决 Producer 返回 () 的问题。

我觉得我真的很接近解决方案,但我还不能完全找到它。您应该能够仅在此文件上运行 stack 来进行复制。

#!/usr/bin/env stack
-- stack --resolver lts-6.20 runghc --package pipes  --package pipes-concurrency  --package operational --package process-streaming

{-# LANGUAGE OverloadedStrings, LambdaCase #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE GADTs #-}
module West.Asterisk where

import System.Process.Streaming as PS
import Control.Monad.Operational as Op

import Pipes as P
import Pipes.Concurrent as PC;

import qualified Data.ByteString.Char8 as B

import Control.Concurrent.Async

import GHC.IO.Exception (ExitCode)

data Version = Version String
data Channels = Channels

data AsteriskInstruction a where
    Login :: AsteriskInstruction (Maybe Version)
    CoreShowChannels :: AsteriskInstruction (Maybe Channels)

type Asterisk a = Program AsteriskInstruction a

runAsterisk :: forall a. Asterisk a -> IO a
runAsterisk m = 
  let

    runAsterisk' :: Producer B.ByteString {- TODO Response -} IO () -> Consumer B.ByteString IO () -> Asterisk a -> IO a
    runAsterisk' i o m' = runEffect $ i >-> astPipe m' >-> o
      where
        astPipe :: Asterisk a -> Pipe B.ByteString B.ByteString IO a
        astPipe k = 
          case Op.view m' of

            Return a -> return a

            Login :>>= k -> do
              yield logincmd
              resp <- await -- :: Response
              let v = undefined resp :: Maybe Version
              astPipe (k v)

            CoreShowChannels :>>= k -> do
              yield coreshowchannelscmd
              resp <- await
              let c = undefined resp :: Maybe Channels
              astPipe (k c)

  in do
    withSpawn unbounded $ \(out1, in1) -> do
        async $ asteriskManager (fromInput in1) (toOutput out1)
        runAsterisk' (fromInput in1) (toOutput out1) m 

asteriskManager :: Producer B.ByteString IO () -> Consumer B.ByteString IO () -> IO ExitCode
asteriskManager prod cons = do
  let ssh = shell "nc someserver 5038"
  execute (piped ssh) (foldOut (withConsumer cons) *> feedProducer prod *> exitCode)


logincmd, coreshowchannelscmd :: B.ByteString
logincmd = "action: login\nusername: username\nsecret: pass\nevents: off\n\n"
coreshowchannelscmd = "action: coreshowchannels\n\n"

错误:

  Blah.hs:38:45:
    Couldn't match type ‘a’ with ‘()’
      ‘a’ is a rigid type variable bound by
          the type signature for runAsterisk :: Asterisk a -> IO a
          at Blah.hs:33:23
    Expected type: Proxy () B.ByteString () B.ByteString IO ()
      Actual type: Pipe B.ByteString B.ByteString IO a
    Relevant bindings include
      astPipe :: Asterisk a -> Pipe B.ByteString B.ByteString IO a
        (bound at Blah.hs:41:9)
      m' :: Asterisk a (bound at Blah.hs:38:22)
      runAsterisk' :: Producer B.ByteString IO ()
                      -> Consumer B.ByteString IO () -> Asterisk a -> IO a
        (bound at Blah.hs:38:5)
      m :: Asterisk a (bound at Blah.hs:34:13)
      runAsterisk :: Asterisk a -> IO a (bound at Blah.hs:34:1)
    In the second argument of ‘(>->)’, namely ‘astPipe m'’
    In the first argument of ‘(>->)’, namely ‘i >-> astPipe m'’

最佳答案

返回 ()

生产者消费者可以自行停止。返回类型多态的生产者消费者永远不会自行停止。

要统一您的情况下的返回类型,请使用 fmap 将每个返回类型放入 Either 的不同分支中。

runAsterisk' :: Producer B.ByteString IO () 
             -> Consumer B.ByteString IO () 
             -> Asterisk a 
             -> IO (Either () a)
runAsterisk' i o m' = runEffect $ fmap Left i >-> fmap Right (astPipe m') >-> fmap Left o

Either 上的模式匹配将揭示哪个组件停止了管道。

此外,您还可以使用 drainConsumer a IO () 转换为永不自行停止的消费者:

neverStop :: Consumer a IO () -> Consumer a IO r
neverStop consumer = consumer *> drain

原始消费者停止后收到的所有输入都将被丢弃。

关于haskell - 连接管道与返回不同值的消费者和生产者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39854497/

相关文章:

haskell - MultiParamTypeClasses 的歧义问题

haskell - 如何检查 bool 列表是否包含值?

haskell - 这种请求-响应类型是否有标准抽象?

System T Combinator 语言的 Haskell 解释器

c - 如何为递归类型创建 Storable 实例?

c++ - 为什么 Haskell 对于简单的斐波那契比 C++ 更快

haskell - 在 Haskell 中使用 Pipes 读写二进制数据

csv - 使用 pipes-csv 从 csv 文件中读取第一行

haskell - 将 StateT IO monad 转换为使用 Control.Proxy - 哪个是服务器,哪个是客户端?

haskell - 将字节流传输到网络 websocket