playframework - 基于 "resource"连接的客户端正在处理的 BroadcastHub 过滤?

标签 playframework websocket akka broadcast akka-stream

我正在编写一个纯 websocket web 应用程序,这意味着在 websocket 升级之前没有用户/客户端步骤,更具体地说:
身份验证请求像其他通信一样通过 websockets

有/有:

  • /api/ws
  • 上正好有一个 websocket 端点
  • 连接到该端点的多个客户端
  • 多个客户的多个项目

  • 现在,并不是每个客户端都可以访问每个项目——访问控制是在服务器端(ofc)实现的,与 websocket 本身无关。

    我的问题是,我想允许协作,这意味着 N 个客户可以一起处理 1 个项目。

    现在,如果其中一个客户修改了某些内容,我想通知所有其他正在从事该项目的客户。

    这一点尤其重要,因为 atm 我是唯一一个致力于此并对其进行测试的人,这是我这边的主要疏忽,因为现在:

    如果客户端 A 连接到项目 X 并且客户端 B 连接到项目 Y,如果其中任何一个更新了各自项目中的某些内容,则另一个会收到这些更改的通知。

    现在我的 WebsocketController 相当简单,我基本上有这个:

    private val fanIn = MergeHub.source[AllowedWSMessage].to(sink).run()
    private val fanOut = source.toMat(BroadcastHub.sink[AllowedWSMessage])(Keep.right).run()
    
    def handle: WebSocket = WebSocket.accept[AllowedWSMessage, AllowedWSMessage]
    {
      _ => Flow.fromSinkAndSource(fanIn, fanOut)
    }
    

    现在根据我的理解,我需要的是

    1) 每个项目有多个 websocket 端点,例如/api/{project_identifier}/ws

    (X)或

    2)根据他们正在工作的项目拆分WebSocket连接/连接的客户端的一些方法。

    因为我不想走路线1)我将分享我对2)的想法:

    我目前看不到解决方法的问题是,我可以轻松地在服务器端创建一些集合,在其中存储在任何给定时刻哪个用户正在处理哪个项目(例如,如果他们选择/切换一个项目,客户端将其发送到服务器并存储此信息)

    但我还有那个 fanOut ,所以这不能解决我关于 WebSocket/AkkaStreams 的问题。

    BroadcastHub 上是否可以调用一些魔法(过滤)?那是我想要的吗?

    编辑:在尝试但未能应用@James Roper 的良好提示之后,现在在这里分享我的整个 websocket 逻辑:

     class WebSocketController @Inject()(implicit cc: ControllerComponents, ec: ExecutionContext, system: ActorSystem, mat: Materializer) extends AbstractController(cc)
    

    {
    val 记录器:记录器 = 记录器(this.getClass())

    type WebSocketMessage = Array[Byte]
    
    import scala.concurrent.duration._
    
    val tickingSource: Source[WebSocketMessage, Cancellable] =
      Source.tick(initialDelay = 1 second, interval = 10 seconds, tick = NotUsed)
        .map(_ => Wrapper().withKeepAlive(KeepAlive()).toByteArray)
    
    private val generalActor = system.actorOf(Props
    {
      new myActor(system, "generalActor")
    }, "generalActor")
    
    private val serverMessageSource = Source
      .queue[WebSocketMessage](10, OverflowStrategy.backpressure)
      .mapMaterializedValue
      { queue => generalActor ! InitTunnel(queue) }
    
    private val sink: Sink[WebSocketMessage, NotUsed] = Sink.actorRefWithAck(generalActor, InternalMessages.Init(), InternalMessages.Acknowledged(), InternalMessages.Completed())
    private val source: Source[WebSocketMessage, Cancellable] = tickingSource.merge(serverMessageSource)
    
    private val fanIn = MergeHub.source[WebSocketMessage].to(sink).run()
    private val fanOut = source.toMat(BroadcastHub.sink[WebSocketMessage])(Keep.right).run()
    
    // TODO switch to WebSocket.acceptOrResult
    def handle: WebSocket = WebSocket.accept[WebSocketMessage, WebSocketMessage]
      {
        //_ => createFlow()
        _ => Flow.fromSinkAndSource(fanIn, fanOut)
      }
    
    private val projectHubs = TrieMap.empty[String, (Sink[WebSocketMessage, NotUsed], Source[WebSocketMessage, NotUsed])]
    
    private def buildProjectHub(projectName: String) =
    {
      logger.info(s"building projectHub for $projectName")
    
      val projectActor = system.actorOf(Props
      {
        new myActor(system, s"${projectName}Actor")
      }, s"${projectName}Actor")
    
      val projectServerMessageSource = Source
        .queue[WebSocketMessage](10, OverflowStrategy.backpressure)
        .mapMaterializedValue
        { queue => projectActor ! InitTunnel(queue) }
    
      val projectSink: Sink[WebSocketMessage, NotUsed] = Sink.actorRefWithAck(projectActor, InternalMessages.Init(), InternalMessages.Acknowledged(), InternalMessages.Completed())
      val projectSource: Source[WebSocketMessage, Cancellable] = tickingSource.merge(projectServerMessageSource)
    
      val projectFanIn = MergeHub.source[WebSocketMessage].to(projectSink).run()
      val projectFanOut = projectSource.toMat(BroadcastHub.sink[WebSocketMessage])(Keep.right).run()
    
      (projectFanIn, projectFanOut)
    }
    
    private def getProjectHub(userName: String, projectName: String): Flow[WebSocketMessage, WebSocketMessage, NotUsed] =
    {
      logger.info(s"trying to get projectHub for $projectName")
    
      val (sink, source) = projectHubs.getOrElseUpdate(projectName, {
        buildProjectHub(projectName)
      })
    
      Flow.fromSinkAndSourceCoupled(sink, source)
    }
    
    private def extractUserAndProject(msg: WebSocketMessage): (String, String) =
    {
      Wrapper.parseFrom(msg).`type` match
      {
        case m: MessageType =>
          val message = m.value
          (message.userName, message.projectName)
        case _ => ("", "")
      }
    }
    
    private def createFlow(): Flow[WebSocketMessage, WebSocketMessage, NotUsed] =
    {
      // broadcast source and sink for demux/muxing multiple chat rooms in this one flow
      // They'll be provided later when we materialize the flow
      var broadcastSource: Source[WebSocketMessage, NotUsed] = null
      var mergeSink: Sink[WebSocketMessage, NotUsed] = null
    
      Flow[WebSocketMessage].map
      {
        m: WebSocketMessage =>
        val msg = Wrapper.parseFrom(m)
        logger.warn(s"client sent project related message: ${msg.toString}");
        m
      }.map
        {
          case isProjectRelated if !extractUserAndProject(isProjectRelated)._2.isEmpty =>
            val (userName, projectName) = extractUserAndProject(isProjectRelated)
    
            logger.info(s"userName: $userName, projectName: $projectName")
            val projectFlow = getProjectHub(userName, projectName)
    
            broadcastSource.filter
            {
              msg =>
                val (_, project) = extractUserAndProject(msg)
                logger.info(s"$project == $projectName")
                (project == projectName)
            }
              .via(projectFlow)
              .runWith(mergeSink)
    
            isProjectRelated
    
          case other =>
          {
            logger.info("other")
            other
          }
        } via {
          Flow.fromSinkAndSourceCoupledMat(BroadcastHub.sink[WebSocketMessage], MergeHub.source[WebSocketMessage])
          {
            (source, sink) =>
              broadcastSource = source
              mergeSink = sink
    
              source.filter(extractUserAndProject(_)._2.isEmpty)
                .map
                { x => logger.info("Non project related stuff"); x }
                .via(Flow.fromSinkAndSource(fanIn, fanOut))
                .runWith(sink)
    
              NotUsed
          }
        }
    }
    

    }

    解决方案/想法我是如何理解的:

    1) 我们有一个“包装流程”,其中我们有一个为空的 broadcastSource 和 mergeSink,直到我们在外部 } via { 中实现它们。堵塞

    2)在那个“包装流程”中,我们映射每个元素来检查它。

    I) 如果是项目相关的,我们

    a) 为项目获取/创建自己的子流程
    b) 根据项目名称过滤元素
    c)让通过过滤器的那些被子/项目流消耗,以便连接到项目的每个人都获得该元素

    II)如果它与项目无关,我们只是传递它

    3) 我们的包装流程是“按需”物化流程,在 via在具体化的地方,我们让与项目无关的元素分发到所有连接的 Web 套接字客户端。

    总结一下:我们有一个用于 websocket 连接的“包装流”,它可以通过 projectFlow 或 generalFlow 进行,具体取决于它正在使用的消息/元素。

    我现在的问题是(这似乎是微不足道的,但我在不知何故苦苦挣扎)每条消息都应该进入 myActor (atm) 并且也应该有消息从那里发出(参见 serverMesssageSourcesource )

    但是上面的代码正在创建不确定的结果,例如一个客户端发送 2 条消息,但有 4 条正在处理(根据服务器发回的日志和结果),有时消息在从 Controller 到参与者的途中突然丢失。

    我无法解释,但如果我只留下 _ => Flow.fromSinkAndSource(fanIn, fanOut)每个人都得到了一切,但至少如果只有一个客户,它会完全符合预期(显然:))

    最佳答案

    我实际上建议使用 Play 的 socket.io support .这提供了命名空间,从你的描述中我可以看出,它可以直接实现你想要的——每个命名空间都是它自己独立管理的流,但所有命名空间都使用同一个 WebSocket。我 wrote a blog post关于您今天可能选择使用 socket.io 的原因。

    如果您不想使用 socket.io,我在这里有一个示例(它使用 socket.io,但不使用 socket.io 命名空间,因此可以很容易地适应在直接 WebSockets 上运行),它显示了一个多聊天房间协议(protocol) - 它将消息馈送到 BroadcastHub,然后用户当前所在的每个聊天室都有一个订阅中心(对您来说,每个项目都有一个订阅)。这些订阅中的每一个都会过滤来自中心的消息以仅包含该订阅聊天室的消息,然后将消息馈送到该聊天室 MergeHub。

    如果您可以将 WebSocket 连接调整为 ChatEvent 的流,则此处突出显示的代码根本不是特定于 socket.io 的。 ,您可以按原样使用它:

    https://github.com/playframework/play-socket.io/blob/c113e74a4d9b435814df1ccdc885029c397d9179/samples/scala/multi-room-chat/app/chat/ChatEngine.scala#L84-L125

    为了满足您通过每个人都连接到的广播 channel 引导非项目特定消息的要求,首先,创建该 channel :

    val generalFlow = {
      val (sink, source) = MergeHub.source[NonProjectSpecificEvent]
        .toMat(BroadcastHub.sink[NonProjectSpecificEvent])(Keep.both).run
      Flow.fromSinkAndSourceCoupled(sink, source)
    }
    

    然后,当每个连接的 WebSocket 的广播接收器/源连接时,附加它(这是来自聊天示例:
    } via {
      Flow.fromSinkAndSourceCoupledMat(BroadcastHub.sink[YourEvent], MergeHub.source[YourEvent]) { (source, sink) =>
        broadcastSource = source
        mergeSink = sink
    
        source.filter(_.isInstanceOf[NonProjectSpecificEvent])
          .via(generalFlow)
          .runWith(sink)
    
        NotUsed
      }
    }
    

    关于playframework - 基于 "resource"连接的客户端正在处理的 BroadcastHub 过滤?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46510605/

    相关文章:

    scala - 在 Play 中,每个请求都会产生一个 Akka Actor 吗?

    java - 使用 Play 表单对象检查表单字段是否已更改

    forms - 如何比较 Play Framework 2 表单中的枚举?

    playframework - 如何在非 Play 应用程序中使用 Play Framework 配置库命令行参数

    java - Jetty Websocket 服务器在本地工作,但远程连接失败并出现 "Host is down"错误,如何解决?

    scala - Spray/Akka 缺失隐式

    python - 使用 channel 2 向一位用户发送通知

    python - 诊断间歇性/"lagged"Websocket数据收集

    java - 获取加载 Spring 上下文的监听器

    java - 无法解析对值 :$akka. 的替换 stream-blocking io dispatcher