我尝试实现了一个简单的基于 TCP 的协议(protocol),用于与 Akka Streams 交换消息(见下文)。但是,似乎传入 消息没有立即处理;也就是说,在客户端接连发送两条消息的场景中,第一条消息仅在从服务器发送某些内容后打印:
At t=1, on [client] A is entered
At t=2, on [client] B is entered
At t=3, on [server] Z is entered
At t=4, on [server] A is printed
At t=5, on [server] Y is entered
At t=6, on [server] B is printed
我期望/想看到的:
At t=1, on [client] A is entered
At t=2, on [server] A is printed
At t=3, on [client] B is entered
At t=4, on [server] B is printed
At t=5, on [server] Z is entered
At t=6, on [client] Z is printed
At t=7, on [server] Y is entered
At t=8, on [client] Y is printed
我错过了什么?也许我需要以某种方式让两端的水槽变得急切?或者每个接收器是否以某种方式被相应的源阻塞(当源正在等待来自命令行的输入时)?
import java.nio.charset.StandardCharsets.UTF_8
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{BidiFlow, Flow, Sink, Source, Tcp}
import akka.util.ByteString
import com.typesafe.config.ConfigFactory
import scala.io.StdIn
object AkkaStreamTcpChatter extends App {
implicit val system = ActorSystem("akka-stream-tcp-chatter", ConfigFactory.defaultReference())
implicit val materializer = ActorMaterializer()
type Message = String
val (host, port) = ("localhost", 46235)
val deserialize:ByteString => Message = _.utf8String
val serialize:Message => ByteString = message => ByteString(message getBytes UTF_8)
val incoming:Flow[ByteString, Message, _] = Flow fromFunction deserialize
val outgoing:Flow[Message, ByteString, _] = Flow fromFunction serialize
val protocol = BidiFlow.fromFlows(incoming, outgoing)
def prompt(s:String):Source[Message, _] = Source fromIterator {
() => Iterator.continually(StdIn readLine s"[$s]> ")
}
val print:Sink[Message, _] = Sink foreach println
args.headOption foreach {
case "server" => server()
case "client" => client()
}
def server():Unit =
Tcp()
.bind(host, port)
.runForeach { _
.flow
.join(protocol)
.runWith(prompt("S"), print)
}
def client():Unit =
Tcp()
.outgoingConnection(host, port)
.join(protocol)
.runWith(prompt("C"), print)
}
最佳答案
我认为问题在于 Akka Stream 确实 operator fusion .这意味着完整的流程处理在单个参与者上运行。当它阻止阅读您的消息时,它无法打印出任何内容。
解决方案是在您的源之后添加一个异步边界。请参阅下面的示例。
def server(): Unit =
Tcp()
.bind(host, port)
.runForeach {
_
.flow
.join(protocol)
.runWith(prompt("S").async, print) // note .async here
}
def client(): Unit =
Tcp()
.outgoingConnection(host, port)
.join(protocol).async
.runWith(prompt("C").async, print) // note .async here
当您添加异步边界时,融合不会跨越边界发生,并且 prompt
会在另一个 actor 上运行,因此不会阻止 print
显示任何内容.
关于scala - 如何使用 Akka Streams 实现一个简单的 TCP 协议(protocol)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36242183/