java - Akka 流 GraphStage

标签 java scala akka-stream

在 Akka Streams 中建议使用 GraphStage,但我找不到任何有关在 Java 中使用 getStageActor() 方法的文档(我找到的所有文档都使用了 Scala)。

如何将以下代码转换为 Java?

lazy val self: StageActor = getStageActor(onMessage)

private def onMessage(x: (ActorRef, Any)): Unit =
{
  x match {
    case (_, msg: String) =>
      log.info("received msg, queueing: {} ", msg)
      messages = messages.enqueue(msg)
      pump()
  }
}

最佳答案

根据 getStageActor method documentation ,它接受类型的值

scala.Function1<scala.Tuple2<ActorRef,java.lang.Object>, scala.runtime.BoxedUnit>

在 Scala 中看起来像

((ActorRef, AnyRef)) => Unit

在 Java 中,这种类型在语义上等同于(使用 Function 接口(interface))

Function<Tuple<ActorRef, Object>, Void>

哪里Tuple<A, B>是一个包含两个 A 类型值的类和B .

因此,调用getStageActor方法中,您需要创建上述类型的值。您可以直接通过构造扩展 AbstractFunction1 的类的实例来完成此操作。 :

import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

getStateActor(new AbstractFunction1<Tuple2<ActorRef, Object>, BoxedUnit>() {
    @Override
    public BoxedUnit apply(Tuple2<ActorRef, Object> args) {
        return BoxedUnit.UNIT;
    }
});

如果您使用 Java 8,则可以使用 lambda 表达式在语法上更好的方法来实现此目的。

如果您使用 Scala 2.12+,则 scala.Function1是函数式接口(interface),可以直接使用lambda表达式:

getStateActor((args: Tuple2<ActorRef, Object>) -> BoxedUnit.UNIT);

如果您使用旧版本的 Scala,则由于特征的编译方式,Function1不是一个功能接口(interface),您需要使用scala-java8-compat图书馆。有了它,代码看起来像

import static scala.compat.java8.JFunction.*;

getStateActor(func((args: Tuple2<ActorRef, Object>) -> BoxedUnit.UNIT));

然后,为了实现该函数的逻辑,您可以使用 _1() 访问元组的元素。和_2()方法:

(args: Tuple2<ActorRef, Object>) -> {
    Object msg = args._2();
    if (msg instanceof String) {
        log.info("received msg, queueing: {} ", msg);
        messages = messages.enqueue((String) msg);
        pump();
    }
    return BoxedUnit.UNIT;
}

这是您想要转换的逻辑的直接翻译。

关于java - Akka 流 GraphStage,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48720602/

相关文章:

java - 如何处理java.lang.OutOfMemoryError : GC overhead limit exceeded gracefully?

java - 斐波那契数列检测器 Java

scala - Scala中的DSL是什么?

scala - 阿卡流 : dynamically grow outlets during graph builder phase

作为参数生命周期传递的 Java 内联对象

json - 使用 Argonaut 或 Circe 从不完整的 JSON 更新案例类

scala - 如果测试失败,如何配置 ScalaTest 以中止套件?

Java/Kotlin- Akka Stream Source.reduce 在 Source 中为 null 时不起作用

apache-spark - 如何有效地从 Cassandra 读取数百万行?

java.util.Logger 打包在不同操作系统中时的行为有所不同