scala - 将 SSE 与 Redis 发布/订阅和 Akka Streams 一起使用的最简单方法是什么?

标签 scala redis akka akka-stream akka-http

我想为以下场景流式传输分 block 服务器发送的事件:

订阅 Redis 键,如果键发生变化,使用 Akka Streams 流式传输新值。它应该只在有新值时流式传输。

据我了解,我需要一个Source。我猜这是对 channel 的订阅:

redis.subscriber.subscribe("My Channel") {
  case message @ PubSubMessage.Message(channel, messageBytes) => println(
    message.readAs[String]()
  )
  case PubSubMessage.Subscribe(channel, subscribedChannelsCount) => println(
    s"Successfully subscribed to $channel"
  )
}

在我的 route ,我需要从中创建一个 Source,但老实说我不知道​​如何开始:

val route: Route =
  path("stream") {
   get {
     complete {
       val source: Source[ServerSentEvent, NotUsed] =
         Source
          .asSubscriber(??) // or fromPublisher???
      .map(_ => {
        ??
      })
      .map(toServerSentEvent)
      .keepAlive(1.second, () => ServerSentEvent.heartbeat)
      .log("stream")
     }
   }

最佳答案

一种方法是使用 Source.actorRefBroadcastHub.sink :

val (sseActor, sseSource) =
  Source.actorRef[String](10, akka.stream.OverflowStrategy.dropTail)
    .map(toServerSentEvent) // converts a String to a ServerSentEvent
    .keepAlive(1.second, () => ServerSentEvent.heartbeat)
    .toMat(BroadcastHub.sink[ServerSentEvent])(Keep.both)
    .run()

将具体化的 ActorRef 订阅到您的消息 channel :发送到此 actor 的消息会向下游发出。如果没有下游需求,则按照指定的溢出策略将消息缓存到一定数量(本例中缓存大小为10)。请注意,这种方法没有背压。

redis.subscriber.subscribe("My Channel") {
  case message @ PubSubMessage.Message(channel, messageBytes) =>
    val strMsg = message.readAs[String]
    println(strMsg)
    sseActor ! strMsg

  case ...
}

另请注意,上面的示例使用了 Source.actorRef[String];根据需要调整类型和示例(例如,它可以是 Source.actorRef[PubSubMessage.Message])。

并且您可以在您的路径中使用具体化的 Source:

path("stream") {
  get {
    complete(sseSource)
  }
}

关于scala - 将 SSE 与 Redis 发布/订阅和 Akka Streams 一起使用的最简单方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49028905/

相关文章:

scala - 如何通过提问方式和监督来处理异常

java - 如何在喷雾路由中将Java对象转换为Json

redis - 在redis中存储重复列表

redis - 如何像mysql一样在redis中进行与操作?

java - Akka 有限状态机实例

java - Playframework 从 2.4 迁移到 2.5 : java. lang.IllegalStateException:尝试调用 Materialize()

scala - 如何在Scala中进行模式匹配时为中间名分配名称

Scala 推断类型参数 - 推断为 'Nothing' 的类型边界

scala - 为什么不能将 Eithers 列表展平?

redis - Socket.io redis 数据如何存储和清除