scala - 使用 Akka 的简单服务器推送广播流程

标签 scala akka akka-stream akka-http

我正在努力实现一个 - 相当简单 - Akka 流程。
这是我认为我需要的:

akka-flow

我有一个服务器和 n 个客户端,希望能够通过向客户端广播消息 (JSON) 来对外部事件使用react。客户可以随时注册/注销。

例如:

  • 1 位客户已注册
  • 服务器抛出一个事件(“Hello World!”)
  • 服务器广播“Hello World!”给所有客户(一位客户)
  • 一个新客户端打开一个 websocket 连接
  • 服务器抛出另一个事件(“Hello Akka!”)
  • 服务器广播“Hello Akka!”给所有客户(两个客户)

  • 这是我到目前为止所拥有的:

    def route: Route = {
       val register = path("register") {
         // registration point for the clients
         handleWebSocketMessages(serverPushFlow)
       }
    }
    
    // ...
    
    def broadcast(msg: String): Unit = {
      // use the previously created flow to send messages to all clients
      // ???
    }
    
    // my broadcast sink to send messages to the clients
    val broadcastSink: Sink[String, Source[String, NotUsed]] = BroadcastHub.sink[String]
    
    // a source that emmits simple strings
    val simpleMsgSource = Source(Nil: List[String])
    
    def serverPushFlow = {
      Flow[Message].mapAsync(1) {
        case TextMessage.Strict(text) =>       Future.successful(text)
        case streamed: TextMessage.Streamed => streamed.textStream.runFold("")(_ ++ _)
      }
      .via(Flow.fromSinkAndSource(broadcastSink, simpleMsgSource))
      .map[Message](string => TextMessage(string))
    }
    

    最佳答案

    为了能够使用broadcastHub,您必须定义两个流。一个运行你的 websocket TextMessagebroadcastHub .您必须运行它,它会生成一个连接到每个客户端的源。

    这是在简单的可运行应用程序中描述的这个概念。

    import akka.NotUsed
    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl.{BroadcastHub, Sink, Source}
    import org.slf4j.LoggerFactory
    
    import scala.concurrent.duration._
    
    object BroadcastSink extends App {
    
      private val logger = LoggerFactory.getLogger("logger")
    
      implicit val actorSystem = ActorSystem()
      implicit val actorMaterializer = ActorMaterializer()
    
      val broadcastSink: Sink[String, Source[String, NotUsed]] =
        BroadcastHub.sink[String]
    
      val simpleMsgSource = Source.tick(500.milli, 500.milli, "Single Message")
    
      val sourceForClients: Source[String, NotUsed] = simpleMsgSource.runWith(broadcastSink)
    
      sourceForClients.to(Sink.foreach(t => logger.info(s"Client 1: $t"))).run()
      Thread.sleep(1000)
    
      sourceForClients.to(Sink.foreach(t => logger.info(s"Client 2: $t"))).run()
      Thread.sleep(1000)
    
      sourceForClients.to(Sink.foreach(t => logger.info(s"Client 3: $t"))).run()
      Thread.sleep(1000)
    
      sourceForClients.to(Sink.foreach(t => logger.info(s"Client 4: $t"))).run()
      Thread.sleep(1000)
    
      actorSystem.terminate()
    }
    

    打印
    10:52:01.774 Client 1: Single Message
    10:52:02.273 Client 1: Single Message
    10:52:02.273 Client 2: Single Message
    10:52:02.773 Client 2: Single Message
    10:52:02.773 Client 1: Single Message
    10:52:03.272 Client 3: Single Message
    10:52:03.272 Client 2: Single Message
    10:52:03.272 Client 1: Single Message
    10:52:03.772 Client 1: Single Message
    10:52:03.772 Client 3: Single Message
    10:52:03.773 Client 2: Single Message
    10:52:04.272 Client 2: Single Message
    10:52:04.272 Client 4: Single Message
    10:52:04.272 Client 1: Single Message
    10:52:04.273 Client 3: Single Message
    10:52:04.772 Client 1: Single Message
    10:52:04.772 Client 2: Single Message
    10:52:04.772 Client 3: Single Message
    10:52:04.772 Client 4: Single Message
    10:52:05.271 Client 4: Single Message
    10:52:05.271 Client 1: Single Message
    10:52:05.271 Client 3: Single Message
    10:52:05.272 Client 2: Single Message
    

    如果事先知道客户,则不需要 BrodacastHub,可以使用 alsoTo方法:
      def webSocketHandler(clients: List[Sink[Message, NotUsed]]): Flow[Message, Message, Any] = {
        val flow = Flow[Message]
        clients.foldLeft(flow) {case (fl, client) =>
          fl.alsoTo(client)
        }
      }
    

    关于scala - 使用 Akka 的简单服务器推送广播流程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54687116/

    相关文章:

    java - Akka Actor (Scala) 如何获取内存不足的堆转储

    akka - 使用Kotlin和Akka-TestKit测试Akka Actor

    scala - 多个 http 请求的 Akka 流程

    scala - 如何以惯用的方式向 Akka 流添加错误日志记录?

    scala - 如何一次通过多个谓词过滤列表?

    scala - 自类型可以与抽象类型一起使用吗?

    scala - 如何理解以下 scala 调用

    scala - Play Scala Akka WebSockets 更改 Actor 路径

    scala - 通过连接池发出 http 请求时 Akka Flow 挂起

    scala - 使用 Scala 将 org.apache.spark.mllib.linalg.Vector RDD 转换为 Spark 中的 DataFrame