scala - 如何使用 Akka Streams 实现一个简单的 TCP 协议(protocol)?

标签 scala tcp akka akka-stream

我尝试实现了一个简单的基于 TCP 的协议(protocol),用于与 Akka Streams 交换消息(见下文)。但是,似乎传入 消息没有立即处理;也就是说,在客户端接连发送两条消息的场景中,第一条消息仅在从服务器发送某些内容后打印:

At t=1, on [client] A is entered
At t=2, on [client] B is entered
At t=3, on [server] Z is entered
At t=4, on [server] A is printed
At t=5, on [server] Y is entered
At t=6, on [server] B is printed

我期望/想看到的:

At t=1, on [client] A is entered
At t=2, on [server] A is printed
At t=3, on [client] B is entered
At t=4, on [server] B is printed
At t=5, on [server] Z is entered
At t=6, on [client] Z is printed
At t=7, on [server] Y is entered
At t=8, on [client] Y is printed

我错过了什么?也许我需要以某种方式让两端的水槽变得急切?或者每个接收器是否以某种方式被相应的源阻塞(当源正在等待来自命令行的输入时)?

import java.nio.charset.StandardCharsets.UTF_8

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{BidiFlow, Flow, Sink, Source, Tcp}
import akka.util.ByteString
import com.typesafe.config.ConfigFactory

import scala.io.StdIn

object AkkaStreamTcpChatter extends App {
  implicit val system = ActorSystem("akka-stream-tcp-chatter", ConfigFactory.defaultReference())
  implicit val materializer = ActorMaterializer()

  type Message = String
  val (host, port) = ("localhost", 46235)

  val deserialize:ByteString => Message = _.utf8String
  val serialize:Message => ByteString = message => ByteString(message getBytes UTF_8)

  val incoming:Flow[ByteString, Message, _] = Flow fromFunction deserialize
  val outgoing:Flow[Message, ByteString, _] = Flow fromFunction serialize

  val protocol = BidiFlow.fromFlows(incoming, outgoing)

  def prompt(s:String):Source[Message, _] = Source fromIterator {
    () => Iterator.continually(StdIn readLine s"[$s]> ")
  }

  val print:Sink[Message, _] = Sink foreach println

  args.headOption foreach {
    case "server" => server()
    case "client" => client()
  }

  def server():Unit =
    Tcp()
      .bind(host, port)
      .runForeach { _
        .flow
        .join(protocol)
        .runWith(prompt("S"), print)
      }

  def client():Unit =
    Tcp()
      .outgoingConnection(host, port)
      .join(protocol)
      .runWith(prompt("C"), print)
}

最佳答案

我认为问题在于 Akka Stream 确实 operator fusion .这意味着完整的流程处理在单个参与者上运行。当它阻止阅读您的消息时,它无法打印出任何内容。

解决方案是在您的源之后添加一个异步边界。请参阅下面的示例。

def server(): Unit =
  Tcp()
    .bind(host, port)
    .runForeach {
      _
        .flow
        .join(protocol)
        .runWith(prompt("S").async, print) // note .async here
    }

def client(): Unit =
  Tcp()
    .outgoingConnection(host, port)
    .join(protocol).async
    .runWith(prompt("C").async, print) // note .async here

当您添加异步边界时,融合不会跨越边界发生,并且 prompt 会在另一个 actor 上运行,因此不会阻止 print 显示任何内容.

关于scala - 如何使用 Akka Streams 实现一个简单的 TCP 协议(protocol)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36242183/

相关文章:

scala - future 不完整?

java - Spring Boot 和 Akka 集群 Actor 依赖注入(inject)不起作用

tcp - RTOS 通过 TCP 连接到本地服务器,但不连接到远程服务器

tcp - MQTT 如何在防火墙后工作?

scala - PersistentFSM 参与者中的 Stackoverflow 异常

scala - 如何在 akka 中使用 "ask"作为三个值

java - 为什么我需要在Java程序中创建一个类?

scala - 将命令行参数传递给 SBT 以设置 TypeSafe 配置键值

scala - 类型参数化和奇怪的转换异常

tcp - Icecast 传输层协议(protocol) - TCP 还是 UDP?