scala - Akka 消息传递时序

标签 scala akka artificial-life

我正在使用 Scala 和 Akka 进行人工生命模拟,到目前为止我对两者都非常满意。我在时间方面遇到了一些问题,但是我无法完全解释。
目前,我模拟中的每只动物都是一对 Actor (动物 + 大脑)。通常,这两个参与者轮流进行(动物将传感器输入发送到大脑,等待结果,对其采取行动并重新开始)。然而,动物时不时地需要彼此互动以互相吃掉或繁殖。
对我来说很奇怪的一件事是时机。事实证明,从一只动物向另一只动物发送信息比从动物发送到大脑要慢很多(大约 100 倍)。与素食者和无性动物相比,这使我可怜的掠食者和性活跃的动物处于不利地位(免责声明:我自己是素食主义者,但我认为成为素食者有更好的理由而不是在试图狩猎时陷入困境。 .)
我提取了一个演示问题的最小代码片段:

package edu.blindworld.test

import java.util.concurrent.TimeUnit

import akka.actor.{ActorRef, ActorSystem, Props, Actor}
import akka.pattern.ask
import akka.util.Timeout

import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.Random

class Animal extends Actor {
  val brain = context.actorOf(Props(classOf[Brain]))
  var animals: Option[List[ActorRef]] = None

  var brainCount = 0
  var brainRequestStartTime = 0L
  var brainNanos = 0L

  var peerCount = 0
  var peerRequestStartTime = 0L
  var peerNanos = 0L

  override def receive = {
    case Go(all) =>
      animals = Some(all)
      performLoop()
    case BrainResponse =>
      brainNanos += (System.nanoTime() - brainRequestStartTime)
      brainCount += 1
      // Animal interactions are rare
      if (Random.nextDouble() < 0.01) {
        // Send a ping to a random other one (or ourselves). Defer our own loop
        val randomOther = animals.get(Random.nextInt(animals.get.length))
        peerRequestStartTime = System.nanoTime()
        randomOther ! PeerRequest
      } else {
        performLoop()
      }
    case PeerResponse =>
      peerNanos += (System.nanoTime() - peerRequestStartTime)
      peerCount += 1
      performLoop()
    case PeerRequest =>
      sender() ! PeerResponse
    case Stop =>
      sender() ! StopResult(brainCount, brainNanos, peerCount, peerNanos)
      context.stop(brain)
      context.stop(self)
  }

  def performLoop() = {
    brain ! BrainRequest
    brainRequestStartTime = System.nanoTime()
  }
}

class Brain extends Actor {
  override def receive = {
    case BrainRequest =>
      sender() ! BrainResponse
  }
}

case class Go(animals: List[ActorRef])
case object Stop
case class StopResult(brainCount: Int, brainNanos: Long, peerCount: Int, peerNanos: Long)

case object BrainRequest
case object BrainResponse

case object PeerRequest
case object PeerResponse

object ActorTest extends App {
  println("Sampling...")
  val system = ActorSystem("Test")
  val animals = (0 until 50).map(i => system.actorOf(Props(classOf[Animal]))).toList
  animals.foreach(_ ! Go(animals))
  Thread.sleep(5000)
  implicit val timeout = Timeout(5, TimeUnit.SECONDS)
  val futureStats = animals.map(_.ask(Stop).mapTo[StopResult])
  val stats = futureStats.map(Await.result(_, Duration(5, TimeUnit.SECONDS)))
  val brainCount = stats.foldLeft(0)(_ + _.brainCount)
  val brainNanos = stats.foldLeft(0L)(_ + _.brainNanos)
  val peerCount = stats.foldLeft(0)(_ + _.peerCount)
  val peerNanos = stats.foldLeft(0L)(_ + _.peerNanos)
  println("Average time for brain request: " + (brainNanos / brainCount) / 1000000.0 + "ms (sampled from " + brainCount + " requests)")
  println("Average time for peer pings: " + (peerNanos / peerCount) / 1000000.0 + "ms (sampled from " + peerCount + " requests)")
  system.shutdown()
}
这是这里发生的事情:
  • 我正在创作 50 对动物/大脑 Actor
  • 它们都启动并运行了 5 秒
  • 每只动物都做一个无限循环,轮流用它的大脑
  • 在所有运行的 1% 中,动物向随机的其他动物发送 ping 并等待其回复。然后,它用它的大脑
  • 继续它的循环。
  • 对大脑和对等端的每个请求都进行了测量,这样我们就可以得到一个平均值
  • 5 秒后,一切都停止了,比较大脑请求和 ping 对等方的时间

  • 在我的双核 i7 上,我看到这些数字:

    Average time for brain request: 0.004708ms (sampled from 21073859 requests)

    Average time for peer pings: 0.66866ms (sampled from 211167 requests)


    因此,对同级的 ping 比对大脑的请求慢 165 倍。我一直在尝试很多方法来解决这个问题(例如优先邮箱和预热 JIT),但一直无法弄清楚发生了什么。有没有人有想法?

    最佳答案

    我认为您应该使用询问模式来处理消息。在您的代码中,BrainRequest 被发送到大脑参与者,然后它发送回 BrainResponse。问题出在这里。 BrainResponse 不是 BrainRequest 的响应。也许是之前 BrainRequest 的回应。

    以下代码使用询问模式,性能结果几乎相同。

    package edu.blindworld.test
    
    import java.util.concurrent.TimeUnit
    
    import akka.actor.{ActorRef, ActorSystem, Props, Actor}
    import akka.pattern.ask
    import akka.util.Timeout
    
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.Await
    import scala.concurrent.duration._
    import scala.util.Random
    
    class Animal extends Actor {
      val brain = context.actorOf(Props(classOf[Brain]))
      var animals: Option[List[ActorRef]] = None
    
      var brainCount = 0
      var brainRequestStartTime = 0L
      var brainNanos = 0L
    
      var peerCount = 0
      var peerRequestStartTime = 0L
      var peerNanos = 0L
    
      override def receive = {
        case Go(all) =>
          animals = Some(all)
          performLoop()
        case PeerRequest =>
          sender() ! PeerResponse
        case Stop =>
          sender() ! StopResult(brainCount, brainNanos, peerCount, peerNanos)
          context.stop(brain)
          context.stop(self)
      }
    
      def performLoop(): Unit = {
        brainRequestStartTime = System.nanoTime()
        brain.ask(BrainRequest)(10.millis) onSuccess {
          case _ =>
            brainNanos += (System.nanoTime() - brainRequestStartTime)
            brainCount += 1
            // Animal interactions are rare
            if (Random.nextDouble() < 0.01) {
              // Send a ping to a random other one (or ourselves). Defer our own loop
              val randomOther = animals.get(Random.nextInt(animals.get.length))
              peerRequestStartTime = System.nanoTime()
              randomOther.ask(PeerRequest)(10.millis) onSuccess {
                case _ =>
                  peerNanos += (System.nanoTime() - peerRequestStartTime)
                  peerCount += 1
                  performLoop()
              }
            } else {
              performLoop()
            }
        }
      }
    }
    
    class Brain extends Actor {
      override def receive = {
        case BrainRequest =>
          sender() ! BrainResponse
      }
    }
    
    case class Go(animals: List[ActorRef])
    case object Stop
    case class StopResult(brainCount: Int, brainNanos: Long, peerCount: Int, peerNanos: Long)
    
    case object BrainRequest
    case object BrainResponse
    
    case object PeerRequest
    case object PeerResponse
    
    object ActorTest extends App {
      println("Sampling...")
      val system = ActorSystem("Test")
      val animals = (0 until 50).map(i => system.actorOf(Props(classOf[Animal]))).toList
      animals.foreach(_ ! Go(animals))
      Thread.sleep(5000)
      implicit val timeout = Timeout(5, TimeUnit.SECONDS)
      val futureStats = animals.map(_.ask(Stop).mapTo[StopResult])
      val stats = futureStats.map(Await.result(_, Duration(5, TimeUnit.SECONDS)))
      val brainCount = stats.foldLeft(0)(_ + _.brainCount)
      val brainNanos = stats.foldLeft(0L)(_ + _.brainNanos)
      val peerCount = stats.foldLeft(0)(_ + _.peerCount)
      val peerNanos = stats.foldLeft(0L)(_ + _.peerNanos)
      println("Average time for brain request: " + (brainNanos / brainCount) / 1000000.0 + "ms (sampled from " + brainCount + " requests)")
      println("Average time for peer pings: " + (peerNanos / peerCount) / 1000000.0 + "ms (sampled from " + peerCount + " requests)")
      system.shutdown()
    }
    

    关于scala - Akka 消息传递时序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30419447/

    相关文章:

    java - 同时使用两个路由器配置的 Akka 路由

    scala - Scala 的哪些特性允许使用 Props[SomeActor] 语法

    c++ - 生命模拟的有效方法

    artificial-intelligence - 我是否应该向由遗传算法训练的人工神经网络添加偏差

    java - 在 Java 中使用类型别名调用 Scala Aux 特征

    java - Akka:创建许多 child Actor 与重用单个 child Actor

    scala - 如何创建自定义注释,如 BeanProperty

    java - 植绒小鸟行为问题

    scala - NoClassDefFound错误: org/apache/spark/sql/SparkSession$ while running spark source code locally

    java - 关于elasticsearch java api错误的奇怪问题