scala - Akka Actor 、 future 和闭包

标签 scala closures actor akka future

我读了 Akka docs关闭来自封闭参与者的变量是危险的。

Warning

In this case you need to carefully avoid closing over the containing actor’s reference, i.e. do not call methods on the enclosing actor from within the anonymous Actor class. This would break the actor encapsulation and may introduce synchronization bugs and race conditions because the other actor’s code will be scheduled concurrently to the enclosing actor.


现在,我有两个 Actor ,其中一个从第二个那里请求一些东西,并对结果做一些事情。在我整理的下面这个例子中,actor Accumulator 从actor NumberGenerator 检索数字并将它们相加,沿途报告总和。
这可以通过至少两种不同的方式来完成,如本示例所示,使用两种不同的接收功能(A 与 B)。两者的区别在于A不关闭计数器变量;相反,它等待一个整数并将其相加,而 B 创建一个 Future 在柜台上关闭并计算总和。如果我正确理解它是如何工作的,这会发生在一个为了处理 onSuccess 而创建的匿名参与者中。
import com.esotericsoftware.minlog.Log

import akka.actor.{Actor, Props}
import akka.pattern.{ask, pipe}
import akka.util.Timeout
import akka.util.duration._

case object Start
case object Request


object ActorTest {
  var wake = 0

  val accRef = Main.actorSystem.actorOf(Props[Accumulator], name = "accumulator")
  val genRef = Main.actorSystem.actorOf(Props[NumberGenerator], name = "generator")

  Log.info("ActorTest", "Starting !")

  accRef ! Start
}

class Accumulator extends Actor {
  var counter = 0

  implicit val timeout = Timeout(5 seconds)

  // A: WITHOUT CLOSURE
  def receive = {
    case Start => ask(ActorTest.genRef, Request).mapTo[Int] pipeTo self
    case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start
  }
  // B: WITH CLOSURE
  def receive = {
    case Start => ask(ActorTest.genRef, Request).mapTo[Int] onSuccess {
      case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start
    }
  }
}

class NumberGenerator extends Actor {
  val rand = new java.util.Random()

  def receive = {
    case Request => sender ! rand.nextInt(11)-5
  }
}
在这种情况下使用闭包绝对是邪恶的吗?当然,我可以使用 AtomicInteger 而不是 Int,或者在某些网络场景中使用,例如 netty , 对 threadsafe 发出写操作 channel ,但这不是我的重点。
冒着问荒谬的风险: future 的 onSuccess 是否有办法在 中执行此 Actor 而不是匿名中间 Actor ,没有 在接收函数中定义一个案例?
编辑
更清楚地说,我的问题是:有没有办法强制一系列 Future 与给定的 Actor 在同一线程中运行?

最佳答案

问题在于onSuccess将在与 Actor 的线程不同的线程中运行 receive即将运行。您可以使用 pipeTo方法,或使用 Agent .制作 counter AtomicInteger会解决这个问题,但它不是那么干净 - 也就是说,它打破了 Actor 模型。

关于scala - Akka Actor 、 future 和闭包,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11130003/

相关文章:

scala - mllib Vector 的最大值?

scala - 如何在 play 2.0 模板中格式化数字/日期?

scala - IntelliJ IDEA 2020.3 Scala 插件不适用于新项目

C++ 将方法和字段带入外部范围,无需显式引用

scala - 事件驱动的递归actor是否有可能出现 StackOverflow 错误?

Java 8 并行排序与 Scala 并行排序

JavaScript + Firebug : "cannot access optimized closure" What does it mean?

ios - 重载iOS Swift中的通用函数

multithreading - Common Lisp 中的 Actor 模型是否有任何好的资源,以及关于 Actor 模型的一般文档?

java - Akka actor resizer 没有创建实例