scala - 从 Akka Actor 处获取消息

标签 scala akka actor

我构建了一个定期查询 API 的 Akka actor,如下所示:

  val cancellable =
    system.scheduler.schedule(0 milliseconds,
      5 seconds,
      actor,
      QueryController(1))

Actor 本质上是:

object UpdateStatistics {
  /**
   * Query the controller for the given switch Id
   *
   * @param dpId Switch's Id
   */
  case class QueryController(dpId: Int)
  case object Stop

  def props: Props = Props[UpdateStatistics]
}

class UpdateStatistics extends Actor with akka.actor.ActorLogging {
  import UpdateStatistics._

  def receive = {

    case QueryController(id) =>
      import context.dispatcher
      log.info(s"Receiving request to query controller")
      Future { FlowCollector.getSwitchFlows(1) } onComplete {
        f => self ! f.get
      }
    case Stop =>
      log.info(s"Shuting down")
      context stop self
    case json: JValue =>
      log.info("Getting json response, computing features...")
      val features = FeatureExtractor.getFeatures(json)
      log.debug(s"Features: $features")
      sender ! features
    case x =>
      log.warning("Received unknown message: {}", x)
  }
}

我想做的是从 UpdateStatistics actor 中获取 json:Jvalue 消息。阅读Akka docs我认为这可能有用:

  implicit val i = inbox()
  i.select() {
     case x => println(s"Valor Devuelto $x")
  }
  println(i receive(2.second))

但我不知道如何修改 UpdateStatistics actor 以便将结果发送到上面的收件箱。

我在文档中读到的另一个选项是 event streams .

但我认为这不是正确的方法。

有没有办法实现我想做的事情?或者我是否需要使用第二个 Actor 来向其发送 JSON 响应?

最佳答案

您可能正在寻找 AKKA 中的ask 模式。这将允许您向发送者返回一个值。

import akka.pattern.ask
import akka.util.duration._

implicit val timeout = Timeout(5 seconds)

val future = actor ? QueryController(1)    
val result = Await.result(future, timeout.duration).asInstanceOf[JValue]

println(result)

要实现此目的,您需要将响应发送给原始发送者,而不是自己。另外,您应该注意将来在处理消息时关闭 sender 的危险。

关于scala - 从 Akka Actor 处获取消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42204361/

相关文章:

scala - 通过任一/析取 Scala 处理多种错误类型

scala 类型问题 : SoftReference, ReferenceQueues, SoftHashMap

Scala.Either getOrElse 方法

java - 在 Scala 中调用 WebSocket 中的方法

scala - 跟踪 Scala actors 中的 "sender hierarchy"

scala - 按类型过滤 Scala 列表

scala - 使用 actor 的可中断循环模式

scala - Source.combine 不接受可变参数?

Scala 远程参与者

scala不喜欢akka中的自变量?