akka - 访问 Actor 邮箱的消息

标签 akka

我想阅读 ShardRegions 邮箱中的消息。在以前的akka​​版本中,我们可以使用下面的代码只获取邮箱的大小:

getContext().getMailboxSize();

有没有办法获取邮箱中的消息类型?

最佳答案

解决方法

像这样为分片区域定义一个包装邮箱:

class UnboundedMailboxWrapper extends MailboxType with ProducesMessageQueue[UnboundedMailboxWrapper.MessageQueue] {
  def this(settings: ActorSystem.Settings, config: Config) = this()

  final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
    new UnboundedMailboxWrapper.MessageQueue
}

object UnboundedMailboxWrapper {
  class MessageQueue extends ConcurrentLinkedQueue[Envelope] with UnboundedQueueBasedMessageQueue {
    val runtime = RuntimeManager.runtime
    final def queue: Queue[Envelope] = this
    override def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
      runtime.queue.add(handle)
      queue add handle
    }
    override def dequeue(): Envelope = {
      if (!runtime.queue.isEmpty) runtime.mailbox.queue.poll()
      queue.poll()
    }
  }
}

在这个邮箱中,我们将添加的元素复制到另一个队列中,因此在计算大小和对其进行其他操作时,对邮箱队列的性能没有任何影响

在重复队列中,我们可以计算消息的数量并对它们进行排序:

def getElemets(): String = {
  runtime.queue.asScala.toList.groupBy(_.message.getClass.getName)
    .map(e ⇒ (e._1, e._2.length)).toSeq
    .sortBy(_._2).foldLeft("") { (a, b) ⇒
    b._1 + ":" + b._2 + "\n" + a
  }
}

并且使用 JMX 或任何其他方式我们可以在运行时调用此方法

最后将这个邮箱分配给 ShardRegion 调度程序:

monit-dispatcher {
  mailbox-type = "im.actor.server.cluster.UnboundedMailboxWrapper"
}

akka.cluster.sharding {
  use-dispatcher = "monit-dispatcher"
}

关于akka - 访问 Actor 邮箱的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46033962/

相关文章:

Java 可选 : map to subclass or else super class

AKKA 和其他消息系统

akka - 如何将多个参与者作为源附加到 Akka 流?

erlang - Erlang/Akka 等如何在后台发送消息?为什么不会导致僵局?

scala - 创建一个 Actor

scala - 如何将 csv 文件作为 akka http 响应发送?

java - 如何在 Java 应用程序中使用 Akka Actors?

scala - Akka:当你告诉 ActorRef 并且它希望你问时会发生什么?

multithreading - Akka 中使用 Actor 模型的读写器锁

scala - 动态合并 Akka 流