我现在正在做一个 Scala 项目,我决定在 Actor 模型上使用 Akka 的代理库,因为它允许使用更实用的并发方法。但是,我在同时运行许多不同的代理时遇到了问题时间。似乎我只能同时运行三四个代理。
import akka.actor._
import akka.agent._
import scala.concurrent.ExecutionContext.Implicits.global
object AgentTester extends App {
// Create the system for the actors that power the agents
implicit val system = ActorSystem("ActorSystem")
// Create an agent for each int between 1 and 10
val agents = Vector.tabulate[Agent[Int]](10)(x=>Agent[Int](1+x))
// Define a function for each agent to execute
def printRecur(a: Agent[Int])(x: Int): Int = {
// Print out the stored number and sleep.
println(x)
Thread.sleep(250)
// Recur the agent
a sendOff printRecur(a) _
// Keep the agent's value the same
x
}
// Start each agent
for(a <- agents) {
Thread.sleep(10)
a sendOff printRecur(a) _
}
}
上面的代码创建了一个代理,其中包含 1 到 10 之间的每个整数。底部的循环将 printRecur 函数发送给每个代理。程序的输出应该显示每四分之一秒打印出的数字 1 到 10(尽管没有任何顺序)。但是,出于某种原因,我的输出仅显示输出的数字 1 到 4。
是否有更规范的方式在 Akka 中使用代理?我来自 clojure 背景,之前在那里成功使用过这种模式,所以我天真地在 Scala 中使用了相同的模式。
最佳答案
我的猜测是您在 4 核盒子上运行,这就是为什么您只看到数字 1-4 的部分原因。这里最重要的是你使用的是默认执行上下文,我猜你的系统使用了一个只有 4 个线程的线程池(每个内核一个)。按照您以这种递归方式进行编码的方式,我的猜测是前 4 个代理永远不会放弃线程,并且它们是唯一会打印任何内容的代理。
您可以通过删除此行轻松解决此问题:
import scala.concurrent.ExecutionContext.Implicits.global
并在创建
ActorSystem
后添加这一行import system.dispatcher
这将使用actor系统的默认调度程序,它是一个fork join调度程序,它似乎与您在示例中导入的默认执行上下文没有相同的问题。
您也可以考虑使用
send
而不是 sendOff
因为这将使用构建代理时可用的执行上下文。我认为人们会使用 sendOff
当他们有一个明确想要使用另一个执行上下文的情况时。
关于scala - Scala 中的并发 Akka 代理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26532745/