我想为以下场景流式传输分 block 服务器发送的事件:
订阅 Redis 键,如果键发生变化,使用 Akka Streams 流式传输新值。它应该只在有新值时流式传输。
据我了解,我需要一个Source
。我猜这是对 channel 的订阅:
redis.subscriber.subscribe("My Channel") {
case message @ PubSubMessage.Message(channel, messageBytes) => println(
message.readAs[String]()
)
case PubSubMessage.Subscribe(channel, subscribedChannelsCount) => println(
s"Successfully subscribed to $channel"
)
}
在我的 route ,我需要从中创建一个 Source
,但老实说我不知道如何开始:
val route: Route =
path("stream") {
get {
complete {
val source: Source[ServerSentEvent, NotUsed] =
Source
.asSubscriber(??) // or fromPublisher???
.map(_ => {
??
})
.map(toServerSentEvent)
.keepAlive(1.second, () => ServerSentEvent.heartbeat)
.log("stream")
}
}
最佳答案
一种方法是使用 Source.actorRef
和 BroadcastHub.sink
:
val (sseActor, sseSource) =
Source.actorRef[String](10, akka.stream.OverflowStrategy.dropTail)
.map(toServerSentEvent) // converts a String to a ServerSentEvent
.keepAlive(1.second, () => ServerSentEvent.heartbeat)
.toMat(BroadcastHub.sink[ServerSentEvent])(Keep.both)
.run()
将具体化的 ActorRef
订阅到您的消息 channel :发送到此 actor 的消息会向下游发出。如果没有下游需求,则按照指定的溢出策略将消息缓存到一定数量(本例中缓存大小为10)。请注意,这种方法没有背压。
redis.subscriber.subscribe("My Channel") {
case message @ PubSubMessage.Message(channel, messageBytes) =>
val strMsg = message.readAs[String]
println(strMsg)
sseActor ! strMsg
case ...
}
另请注意,上面的示例使用了 Source.actorRef[String]
;根据需要调整类型和示例(例如,它可以是 Source.actorRef[PubSubMessage.Message]
)。
并且您可以在您的路径中使用具体化的 Source
:
path("stream") {
get {
complete(sseSource)
}
}
关于scala - 将 SSE 与 Redis 发布/订阅和 Akka Streams 一起使用的最简单方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49028905/