java - 如何在 Java 中将元素发送到 Source.actorRef 或 Source.queue

标签 java akka akka-stream akka-http

我目前正在使用 Akka Streams(Java 语言)进行个人项目,但我很难理解如何将元素发送到 Source

这个想法是使用 WebSocket 将内容推送到用户的 Web 浏览器中。我已经按照 Akka HTTP 文档设法使用 Akka Streams 创建请求响应系统,但这不是我想要做的。

查看 Akka Streams 文档,我看到有 Source.queueSource.actorRef。但我不明白如何将元素放入 Source 中。 Source.queueSource.actorRef 返回一个 Source,它没有方法 offer(对于 Source.queue)或tell(对于Source.actorRef)。

我的问题是:如何获取由 Source.actorRefSourceQueueWithComplete 创建的 SourceActorRef使用 Source.queue 创建的 Source 的 code>,以便能够将元素发送到我的 Source

我搜索了各种 Akka 文档,但没有发现任何方法可以做到这一点。而且我在网上找到的大部分代码都是用Scala写的,似乎没有同样的问题。

最佳答案

来自 Source.actorRefSource.queue 的 actor 和队列分别是 materialized values这些来源,这意味着只有在流运行时才能获取它们。例如:

final ActorRef actor =
  Source.actorRef(Integer.MAX_VALUE, OverflowStrategy.fail())
        .to(Sink.foreach(m -> System.out.println(m)))
        .run(materializer);

actor.tell("do something", ActorRef.noSender());

这在 Scala 中没有什么不同:

implicit val materializer = ActorMaterializer()

val actor =
  Source.actorRef(Int.MaxValue, OverflowStrategy.fail)
        .to(Sink.foreach(println))
        .run()

actor ! "do something"

关于java - 如何在 Java 中将元素发送到 Source.actorRef 或 Source.queue,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47327565/

相关文章:

java - Akka 从源代码修改/创建配置文件

scala - 如何使用 Akka Persistence 保存流式数据

scala - 您将如何更改此 Akka Streams 示例以获得物化值 Future[ServerBinding]?

java - 使用java将大量数据存储到mysql的最佳方法

java - 在 maven 中准备 Ear 文件

java - Hibernate ManyToOne FetchType.LAZY 不起作用?

java - 由于socket异常,mongodb插入失败

java - 为什么 Akka 保留所有这些线程?

java - 编译器错误 : com. sun.tools.javac.code.Symbol$CompletionFailure : class file for akka. actor.SupervisorStrategy$1 未找到

scala - 访问由 Source.actorRef 创建的 akka 流源的底层 ActorRef