scala - 如何使用 Akka 远程处理通过 CLI 向远程参与者发送消息?

标签 scala akka command-line-interface akka-remote-actor akka-remoting

我有一个远程参与者 Bar 和一个本地参与者 Foo。我想在每次调用 CLI 时使用 Foo 将消息传递给 Bar

Bar 可以成功传递消息,但 Foo 在等待消息时挂起。为了解决这个问题,我在 Foo 的 main 末尾添加了一个 sys.exit(0) 。这会导致与 Foo 系统的关联问题。

如何在连续的 CLI 发布之间关闭本地 actor,而无需手动终止本地 actor?

闭嘴,给我代码!


福:

build.sbt

name := "Foo"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.11"
libraryDependencies += "com.typesafe.akka" %% "akka-remote" % "2.4.11"
libraryDependencies += "com.github.scopt" %% "scopt" % "3.5.0"

fork in run := true

Main.scala

import akka.actor._
import com.typesafe.config.ConfigFactory

case class Config(mode: String = "", greeting: String="")

class Foo extends Actor {
  // create the remote actor
  val BarActor = context.actorSelection("akka.tcp://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="03416271507a7077666e433231342d332d332d32" rel="noreferrer noopener nofollow">[email protected]</a>:2552/user/BarActor")

  def receive = {
    case method: String => BarActor ! method
  }
}

object CommandLineInterface {

  val config = ConfigFactory.load()
  val system = ActorSystem("FooSystem", config.getConfig("FooApp"))

  val FooActor = system.actorOf(Props[Foo], name = "FooActor")

  val parser = new scopt.OptionParser[Config]("Foo") {
    head("foo", "1.x")

    help("help").text("prints usage text")

    opt[String]('m', "method").action( (x, c) =>
      c.copy(greeting = x) ).text("Bar will greet with <method>")
  }
}

object Main extends App {
  import CommandLineInterface.{parser, FooActor}

  parser.parse(args, Config()) match {
    case Some(config) => FooActor ! config.greeting
    case None => sys.error("Bad news...")
  }
  /* 
    When sys.exit(0) commented, this hangs and Bar greet.
    When sys.exit(0) uncommented, this doesn't hang, but also Bar doesn't greet.
   */
 
  //sys.exit(0)
}

application.conf

FooApp {
  akka {
    loglevel = "INFO"
    actor {
      provider = "akka.remote.RemoteActorRefProvider"
    }
    remote {
      enabled-transports = ["akka.remote.netty.tcp"]
      netty.tcp {
        hostname = "127.0.0.1"
        port = 0
      }
      log-sent-messages = on
      log-received-messages = on
    }
  }
}

栏:

build.sbt

name := "Bar"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.11"
libraryDependencies += "com.typesafe.akka" %% "akka-remote" % "2.4.11"

Main.scala

import akka.actor._
import com.typesafe.config.ConfigFactory

class Bar extends Actor {
  def receive = {
    case greeting: String => Bar.greet(greeting)
  }
}

object Bar {
  val config = ConfigFactory.load()
  val system = ActorSystem("BarSystem", config.getConfig("BarApp"))
  val BarActor = system.actorOf(Props[Bar], name = "BarActor")

  def greet(greeting: String) = println(greeting)

  def main(args: Array[String]): Unit = {
    /* Intentionally empty */
  }
}

application.conf

BarApp {
  akka {
    loglevel = "INFO"
    actor {
      provider = remote
    }
    remote {
      enabled-transports = ["akka.remote.netty.tcp"]
      netty.tcp {
        hostname = "127.0.0.1"
        port = 2552
      }
      log-sent-messages = on
      log-received-messages = on
    }
  }
}

使用 sbt 'run-main Main -m hello' 运行 Foo,并使用 sbt 'run-main 运行 Bar主要'

抱歉代码太长,但这是解决我问题的 MVCE。

我怎样才能实现我想要的行为——CLI actor 在连续的 CLI 调用之间死亡,远程 actor 等待新消息。

最佳答案

发生这种情况是因为您在向 FooActor 发送消息后立即调用 sys.exit(0),因此应用程序很有可能在 之前退出FooActor 甚至有机会读取消息,更不用说将其转发给 BarActor 了。

似乎有 many possible solutions ,其中之一是:

class Foo extends Actor {
  // create the remote actor
  val BarActor = context.actorSelection("akka.tcp://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="37755645644e4443525a77060500190719071906" rel="noreferrer noopener nofollow">[email protected]</a>:2552/user/BarActor")

  override def receive = {
    case method: String => {
      BarActor ! method
      self ! PoisonPill
    }
  }

  override def postStop = {
    context.system.terminate
  }
}

不幸的是,事实证明系统在将消息分派(dispatch)到 Bar 之前仍然会关闭。

如果您想以“即发即忘”的方式发送消息,我找不到任何合理的解决方案来解决此问题。但是,在大多数情况下,希望从远程参与者获得某种响应,因此您可以这样做:

class Foo extends Actor {
  // create the remote actor
  val BarActor = context.actorSelection("akka.tcp://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="5a183b280923292e3f371a6b686d746a746a746b" rel="noreferrer noopener nofollow">[email protected]</a>:2552/user/BarActor")

  override def receive = {
    case method: String => {
      BarActor ! method
      context.become(waitingToKillMyself)
    }
  }

  def waitingToKillMyself: Receive = {
    case response: String => {
      println(response)
      self ! PoisonPill
    }
  }

  override def postStop = {
    context.system.terminate
  }
}

// ...

object Main extends App {
  import CommandLineInterface.{parser, FooActor, system}
  import system.dispatcher

  parser.parse(args, Config()) match {
    case Some(config) => {
      FooActor ! config.greeting
      system.scheduler.scheduleOnce(10.seconds, FooActor, PoisonPill)
    }

    case None => sys.error("Bad news...")
  }
}

酒吧:

class Bar extends Actor {
  def receive = {
    case greeting: String => {
      Bar.greet(greeting)
      sender() ! "OK"
    }
  }
}

关于scala - 如何使用 Akka 远程处理通过 CLI 向远程参与者发送消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40771802/

相关文章:

c++ - Scala宏和C++模板的异同

json - 使用 circe-optics 从 json 中检索空值

scala - 从 Actor 返回实际结果而不是 Promise/Future

ubuntu - 如何在 Ubuntu 中摆脱 jenv?

node.js - NodeJS 应用程序作为 CLI 工具

scala - 我可以重写类构造函数参数名称以将其重用为公共(public)属性吗?

scala - Play 2.4 : intercept and modify response body

Akka 持久化未处理的消息

java - 如何在 Akka 2.2.4 中使用 UntypedActorFactory()

linux - Alsa 无法设置样本格式[FFMPEG]