scala - Akka TypedActor - 如何正确处理带有上下文的异步响应

标签 scala akka actor typedactor

我已经开始在 Scala 中使用 TypedActors,并且在做一些非常简单的事情时遇到了问题:我希望 Actor A 调用 Actor B 上的方法并在 Actor A 上的匿名函数中处理结果,但要确保:

  • 我的响应处理函数是线程安全的,例如不会与访问 Actor A 状态的任何其他线程同时运行
  • 我的响应处理函数可以引用 Actor A 的上下文

  • 我怎样才能(或我能)同时满足这两个要求?

    例如,这个actor只想调用otherActor上的一个返回Future[Int]的API,并用结果更新它的状态,然后做一些需要它的actor上下文的事情:
    class MyActorImpl extends MyActor {
    
      // my mutable state
      var myNumber = 0
    
      // method proxied by TypedActor ref:
      def doStuff(otherActor: OtherActor): Unit = {
        otherActor.doOtherStuff onSuccess {
          // oops this is no longer running in MyActorImpl..
          // this could be on a concurrent thread if we
          case i => processResult(i)
        }
      }
    
      private def processResult(i: Int): Unit = {
        myNumber = 0 // oops, now we are possibly making a concurrent modification
        println(s"Got $i")
    
        // fails with java.lang.IllegalStateException: Calling TypedActor.context
        // outside of a TypedActor implementation method!
        println(s"My context is ${TypedActor.context}")
      }
    }
    

    我在这里想念什么?我是否需要编写我的处理程序来调用代理接口(interface)上定义的方法以保证单条目?如果我不想在接口(interface)上公开那个特定的“私有(private)”方法(例如 processResult),那看起来会很丑陋。

    这是将在 Scala REPL 中运行的完整版本:
    import akka.actor._
    import scala.concurrent._
    
    val system = ActorSystem("mySystem")
    import system.dispatcher
    
    trait OtherActor {
      def doOtherStuff(): Future[Int]
    }
    
    
    trait MyActor {
      def doStuff(otherActor: OtherActor): Unit
    }
    
    class OtherActorImpl extends OtherActor {
      var i = 0
      def doOtherStuff(): Future[Int] = {
        i += 1
        Future {i} 
      }
    }
    
    class MyActorImpl extends MyActor {
    
      // my mutable state
      var myNumber = 0
    
      // method proxied by TypedActor ref:
      def doStuff(otherActor: OtherActor): Unit = {
        otherActor.doOtherStuff onSuccess {
          // oops this is no longer running in MyActorImpl..
          // this could be on a concurrent thread if we
          case i => processResult(i)
        }
      }
    
      private def processResult(i: Int): Unit = {
        myNumber = 0 // oops, now we are possibly making a concurrent modification
        println(s"Got $i")
    
        // fails with java.lang.IllegalStateException: Calling TypedActor.context
        // outside of a TypedActor implementation method!
        println(s"My context is ${TypedActor.context}")
      }
    }
    
    val actor1: MyActor = TypedActor(system).typedActorOf(TypedProps[MyActorImpl])
    val actor2: OtherActor = TypedActor(system).typedActorOf(TypedProps[OtherActorImpl])
    
    actor1.doStuff(actor2)
    

    最佳答案

    您将 Actor 的状态暴露给外部世界,这是一件非常糟糕的事情。看这里:http://doc.akka.io/docs/akka/2.3.3/general/jmm.html第 9-10 节 Actors and shared mutable state lines 描述了你的情况。

    @philwalk 已经描述了如何解决这个问题:Akka TypedActor - how to correctly handle async responses with context

    关于scala - Akka TypedActor - 如何正确处理带有上下文的异步响应,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23306255/

    相关文章:

    javascript - Play framework 2.0.4 select box 排序顺序

    scala - Windows机器上的spark scala

    java - Akka:自定义 akka 监督策略未使用 Java 注册

    scala - 如何将 Source 动态添加到现有 Graph?

    multithreading - 允许actor定期替换共享的不可变只读状态是 “ok”吗?

    scala - 异常后恢复之前的 actor 状态

    scala - 有没有办法查看 Scala 比赛期间通配符模式接收到的内容?

    c - 动态调度,C

    scala - 声明一种接受未知案例类作为参数的方法,以在 Scala 中的模式匹配中使用它

    scala - Play2.0 : Restart on fatal error?