multithreading - 在 Scala Actor 中集成自主和 react 行为的最佳方式?

标签 multithreading scala asynchronous actor event-based-programming

我是一名经验丰富的 Java 程序员,我正在开始开发基于 actor 的 Scala 应用程序。在我目前正在开发的应用程序中,我必须处理表现出自主和 react 行为的 Sender actor 的实现。场景如下(伪代码):

Actor Sender{

        Active behavior (must be active once the actor boots):
            do-in-sequence{
                send to Stdout A
                send to Stdout B
                send to Stdout C
                send stop to Stdout and then exit
            }

        Reactive behavior (must be active once the actor boots):
            as soon as received stop from StopNotifier -> send stop to Stdout and then exit
    }
}

Actor Stdout{
    Purely reactive behavior (i.e. wait for msg){
        as soon as received A -> print A
        as soon as received B -> print B
        as soon as received C -> print C
        as soon as received stop from Sender -> exit
    }
}
Actor StopNotifier
    Purely active behavior {
        compute, and when some condition is met -> send stop to Sender
    }

我的问题是:对于需要集成自主性和 react 性的 Scala Actor 来说,表达自主行为的最佳方式是什么(如 in this paper 所述)?

换句话说,在上面的例子中编码 Sender actor 的最佳方式/风格是什么?

我想出了一个解决方案(下面报告),但由于我不是 scala 大师(还 :))我想知道我实现的内容是否可以在更好/更好的解决方案中得到改进。
case object START
case object A
case object B
case object C
case object SENT_A
case object SENT_B
case object ACK_A
case object ACK_B
case object ACK_C
case object STOP

class Sender(stdout: Stdout) extends Actor {
    def act() {
        self!START
        while (true){
            receive {
                case START =>
                    stdout!?A
                    self!SENT_A
                case SENT_A =>
                    stdout!?B
                    self!SENT_B
                case SENT_B =>
                    stdout!?C
                    stdout!?STOP
                    exit()
                case STOP => {
                    Console.println("[Sender:]Received STOP, terminating")
                    stdout!?STOP
                    exit()
                }
            }
        }
    }
}

class Stdout() extends Actor {
    def act() { 
        while (true) {
            receive{
                case A =>
                    Console.println("[Stdout:]A")
                    reply(ACK_A)
                case B =>
                    Console.println("[Stdout:]B")
                    reply(ACK_B)
              case C =>
                    Console.println("[Stdout:]C")
                    reply(ACK_C)
                    exit()
              case STOP =>
                    Console.println("[Stdout:]Received STOP, terminating")
                    exit()
            }
        }
    }
}

class StopNotifier(sender: Sender) extends Actor {
    def act() {
        /*
         * The idea is that the StopNotifier should send a STOP message to the Sender
         * when a certain condition is met.
         * The sleep used here is just a semplification, since the detection of such
         * a condition is not relevant for the example.
         */

        Thread.sleep(200)
        Console.println("[StopNotifier:]Sending STOP to sender")
        sender ! STOP
        exit()
   }
}

object app extends Application {
    val stdout = new Stdout
    stdout.start
    val sender = new Sender(stdout)
    sender.start
    val stopNotifier = new StopNotifier(sender)
    stopNotifier.start
}

在我当前的实现中特别困扰我的是,为了能够对从 StopNotifier 接收到的 STOP 消息迅速使用react,我需要在 Sender 的每个执行步骤中自行发送消息。 (即在将 A、B 发送给 Stdout Actor 之后)。在我看来,做这些事情的正确方式太棘手了:)。

我还尝试使用其他 Scala 语言结构(例如异步发送、 react 等)开发其他解决方案,但在我看来,它们似乎受到其他问题/技巧的影响。

有没有人有更好的解决方案来处理 Scala 参与者中自主性和 react 性行为的整合?

最佳答案

如果我理解正确,您应该使用 Akka actor,特别是 Akka FSM,将发送方建模为状态机。 Akka actor 有一个内置的停止机制,或者你总是可以使用你自己的消息,这些消息可以通过 whenUnhandled 从所有状态处理。处理程序。

http://doc.akka.io/docs/akka/snapshot/scala/fsm.html

这显然是矫枉过正,但我​​认为您正在尝试做一些更复杂的事情。您也可以拥有 Stdout “ watch ”Sender以便它在 Sender 时终止终止而不是在收到特定消息时终止。见 Lifecycle Monitoring aka DeathWatch .

package fsmTest

import akka.actor._
import akka.util.duration._

sealed trait Msg
case object A extends Msg
case object B extends Msg
case object C extends Msg

sealed trait SenderState
case object Started extends SenderState
case object SentA extends SenderState
case object SentB extends SenderState

case class SenderData()

class Sender(stdout: ActorRef) 
  extends Actor 
  with FSM[SenderState, SenderData] {

  case object GoNextState

  startWith(Started, SenderData())

  when(Started) {
    case Event(GoNextState, data) => {
      stdout ! A
      goto(SentA) using data
    }
  }

  when(SentA) {
    case Event(GoNextState, data) => {
      stdout ! B
      goto(SentB) using data
    }
  }

  when(SentB) {
    case Event(GoNextState, data) => {
      stdout ! C
      goto(Started) using data
    }
  }

//      //Handle messages which aren't explicitly handled in state here
//      whenUnhandled {
//        case Event(SomeCustomStop, data) => {
//          stop(FSM.Shutdown)
//        }
//      }

  setTimer("goNextState", GoNextState, 1 second, repeat = true)

  initialize
}

class Stdout() extends Actor {
  def receive = {
    case msg: Msg => {
      context.watch(sender) //Not sure if you're gonna want to do this here, but you get the point
      println(msg)
    }
    case Terminated(_) => context.stop(self)
  }
}

object FSMTest extends App {

  implicit val system = ActorSystem("Testing")
  val stdout = system.actorOf(Props[Stdout], "stdout")
  val sender = system.actorOf(Props(new Sender(stdout)), "sender")

  system.scheduler.scheduleOnce(5 seconds) {
    system.stop(sender)
    system.shutdown()
  }
  system.awaitTermination(10 seconds)
}

无论您如何在发送方中实现状态,如果您想使用 Actors 对其进行建模,我相信您将需要“自我发送”消息,无论是在事件处理中还是在我上面使用的计时器中。

关于multithreading - 在 Scala Actor 中集成自主和 react 行为的最佳方式?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11416673/

相关文章:

c++ - 在 C++ 中,获取/释放原子访问与宽松访问与栅栏相结合之间是否存在任何有效区别?

scala - 检查已完成 Promise 的值的正确方法?

java - 用于创建异步流 API 的框架

c# - 异常记录中的空引用异常屏蔽真错误

处理异常的Python线程池

mysql jdbc 驱动程序 NumberFormatException

Scala: Option[T] as ?[T] (or even T?)

node.js - Node : does a find query to select n last records need to run asynchronously

c# - 等待 N 个异步方法完成时是否有标准模式可遵循?

c# - BackGroundWorker Thread中的ShowDialog和UI交互