我目前正在使用 Akka Streams(Java 语言)进行个人项目,但我很难理解如何将元素发送到 Source
。
这个想法是使用 WebSocket 将内容推送到用户的 Web 浏览器中。我已经按照 Akka HTTP 文档设法使用 Akka Streams 创建请求响应系统,但这不是我想要做的。
查看 Akka Streams 文档,我看到有 Source.queue
和 Source.actorRef
。但我不明白如何将元素放入 Source
中。 Source.queue
和 Source.actorRef
返回一个 Source
,它没有方法 offer
(对于 Source.queue
)或tell
(对于Source.actorRef
)。
我的问题是:如何获取由 Source.actorRef
或 SourceQueueWithComplete
创建的 Source
的 ActorRef
使用 Source.queue
创建的 Source
的 code>,以便能够将元素发送到我的 Source
?
我搜索了各种 Akka 文档,但没有发现任何方法可以做到这一点。而且我在网上找到的大部分代码都是用Scala写的,似乎没有同样的问题。
最佳答案
来自 Source.actorRef
和 Source.queue
的 actor 和队列分别是 materialized values这些来源,这意味着只有在流运行时才能获取它们。例如:
final ActorRef actor =
Source.actorRef(Integer.MAX_VALUE, OverflowStrategy.fail())
.to(Sink.foreach(m -> System.out.println(m)))
.run(materializer);
actor.tell("do something", ActorRef.noSender());
这在 Scala 中没有什么不同:
implicit val materializer = ActorMaterializer()
val actor =
Source.actorRef(Int.MaxValue, OverflowStrategy.fail)
.to(Sink.foreach(println))
.run()
actor ! "do something"
关于java - 如何在 Java 中将元素发送到 Source.actorRef 或 Source.queue,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47327565/