我正在使用 Scala 和 Akka 进行人工生命模拟,到目前为止我对两者都非常满意。我在时间方面遇到了一些问题,但是我无法完全解释。
目前,我模拟中的每只动物都是一对 Actor (动物 + 大脑)。通常,这两个参与者轮流进行(动物将传感器输入发送到大脑,等待结果,对其采取行动并重新开始)。然而,动物时不时地需要彼此互动以互相吃掉或繁殖。
对我来说很奇怪的一件事是时机。事实证明,从一只动物向另一只动物发送信息比从动物发送到大脑要慢很多(大约 100 倍)。与素食者和无性动物相比,这使我可怜的掠食者和性活跃的动物处于不利地位(免责声明:我自己是素食主义者,但我认为成为素食者有更好的理由而不是在试图狩猎时陷入困境。 .)
我提取了一个演示问题的最小代码片段:
package edu.blindworld.test
import java.util.concurrent.TimeUnit
import akka.actor.{ActorRef, ActorSystem, Props, Actor}
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.Random
class Animal extends Actor {
val brain = context.actorOf(Props(classOf[Brain]))
var animals: Option[List[ActorRef]] = None
var brainCount = 0
var brainRequestStartTime = 0L
var brainNanos = 0L
var peerCount = 0
var peerRequestStartTime = 0L
var peerNanos = 0L
override def receive = {
case Go(all) =>
animals = Some(all)
performLoop()
case BrainResponse =>
brainNanos += (System.nanoTime() - brainRequestStartTime)
brainCount += 1
// Animal interactions are rare
if (Random.nextDouble() < 0.01) {
// Send a ping to a random other one (or ourselves). Defer our own loop
val randomOther = animals.get(Random.nextInt(animals.get.length))
peerRequestStartTime = System.nanoTime()
randomOther ! PeerRequest
} else {
performLoop()
}
case PeerResponse =>
peerNanos += (System.nanoTime() - peerRequestStartTime)
peerCount += 1
performLoop()
case PeerRequest =>
sender() ! PeerResponse
case Stop =>
sender() ! StopResult(brainCount, brainNanos, peerCount, peerNanos)
context.stop(brain)
context.stop(self)
}
def performLoop() = {
brain ! BrainRequest
brainRequestStartTime = System.nanoTime()
}
}
class Brain extends Actor {
override def receive = {
case BrainRequest =>
sender() ! BrainResponse
}
}
case class Go(animals: List[ActorRef])
case object Stop
case class StopResult(brainCount: Int, brainNanos: Long, peerCount: Int, peerNanos: Long)
case object BrainRequest
case object BrainResponse
case object PeerRequest
case object PeerResponse
object ActorTest extends App {
println("Sampling...")
val system = ActorSystem("Test")
val animals = (0 until 50).map(i => system.actorOf(Props(classOf[Animal]))).toList
animals.foreach(_ ! Go(animals))
Thread.sleep(5000)
implicit val timeout = Timeout(5, TimeUnit.SECONDS)
val futureStats = animals.map(_.ask(Stop).mapTo[StopResult])
val stats = futureStats.map(Await.result(_, Duration(5, TimeUnit.SECONDS)))
val brainCount = stats.foldLeft(0)(_ + _.brainCount)
val brainNanos = stats.foldLeft(0L)(_ + _.brainNanos)
val peerCount = stats.foldLeft(0)(_ + _.peerCount)
val peerNanos = stats.foldLeft(0L)(_ + _.peerNanos)
println("Average time for brain request: " + (brainNanos / brainCount) / 1000000.0 + "ms (sampled from " + brainCount + " requests)")
println("Average time for peer pings: " + (peerNanos / peerCount) / 1000000.0 + "ms (sampled from " + peerCount + " requests)")
system.shutdown()
}
这是这里发生的事情:在我的双核 i7 上,我看到这些数字:
Average time for brain request: 0.004708ms (sampled from 21073859 requests)
Average time for peer pings: 0.66866ms (sampled from 211167 requests)
因此,对同级的 ping 比对大脑的请求慢 165 倍。我一直在尝试很多方法来解决这个问题(例如优先邮箱和预热 JIT),但一直无法弄清楚发生了什么。有没有人有想法?
最佳答案
我认为您应该使用询问模式来处理消息。在您的代码中,BrainRequest 被发送到大脑参与者,然后它发送回 BrainResponse。问题出在这里。 BrainResponse 不是 BrainRequest 的响应。也许是之前 BrainRequest 的回应。
以下代码使用询问模式,性能结果几乎相同。
package edu.blindworld.test
import java.util.concurrent.TimeUnit
import akka.actor.{ActorRef, ActorSystem, Props, Actor}
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Random
class Animal extends Actor {
val brain = context.actorOf(Props(classOf[Brain]))
var animals: Option[List[ActorRef]] = None
var brainCount = 0
var brainRequestStartTime = 0L
var brainNanos = 0L
var peerCount = 0
var peerRequestStartTime = 0L
var peerNanos = 0L
override def receive = {
case Go(all) =>
animals = Some(all)
performLoop()
case PeerRequest =>
sender() ! PeerResponse
case Stop =>
sender() ! StopResult(brainCount, brainNanos, peerCount, peerNanos)
context.stop(brain)
context.stop(self)
}
def performLoop(): Unit = {
brainRequestStartTime = System.nanoTime()
brain.ask(BrainRequest)(10.millis) onSuccess {
case _ =>
brainNanos += (System.nanoTime() - brainRequestStartTime)
brainCount += 1
// Animal interactions are rare
if (Random.nextDouble() < 0.01) {
// Send a ping to a random other one (or ourselves). Defer our own loop
val randomOther = animals.get(Random.nextInt(animals.get.length))
peerRequestStartTime = System.nanoTime()
randomOther.ask(PeerRequest)(10.millis) onSuccess {
case _ =>
peerNanos += (System.nanoTime() - peerRequestStartTime)
peerCount += 1
performLoop()
}
} else {
performLoop()
}
}
}
}
class Brain extends Actor {
override def receive = {
case BrainRequest =>
sender() ! BrainResponse
}
}
case class Go(animals: List[ActorRef])
case object Stop
case class StopResult(brainCount: Int, brainNanos: Long, peerCount: Int, peerNanos: Long)
case object BrainRequest
case object BrainResponse
case object PeerRequest
case object PeerResponse
object ActorTest extends App {
println("Sampling...")
val system = ActorSystem("Test")
val animals = (0 until 50).map(i => system.actorOf(Props(classOf[Animal]))).toList
animals.foreach(_ ! Go(animals))
Thread.sleep(5000)
implicit val timeout = Timeout(5, TimeUnit.SECONDS)
val futureStats = animals.map(_.ask(Stop).mapTo[StopResult])
val stats = futureStats.map(Await.result(_, Duration(5, TimeUnit.SECONDS)))
val brainCount = stats.foldLeft(0)(_ + _.brainCount)
val brainNanos = stats.foldLeft(0L)(_ + _.brainNanos)
val peerCount = stats.foldLeft(0)(_ + _.peerCount)
val peerNanos = stats.foldLeft(0L)(_ + _.peerNanos)
println("Average time for brain request: " + (brainNanos / brainCount) / 1000000.0 + "ms (sampled from " + brainCount + " requests)")
println("Average time for peer pings: " + (peerNanos / peerCount) / 1000000.0 + "ms (sampled from " + peerCount + " requests)")
system.shutdown()
}
关于scala - Akka 消息传递时序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30419447/