我是一名经验丰富的 Java 程序员,我正在开始开发基于 actor 的 Scala 应用程序。在我目前正在开发的应用程序中,我必须处理表现出自主和 react 行为的 Sender actor 的实现。场景如下(伪代码):
Actor Sender{
Active behavior (must be active once the actor boots):
do-in-sequence{
send to Stdout A
send to Stdout B
send to Stdout C
send stop to Stdout and then exit
}
Reactive behavior (must be active once the actor boots):
as soon as received stop from StopNotifier -> send stop to Stdout and then exit
}
}
Actor Stdout{
Purely reactive behavior (i.e. wait for msg){
as soon as received A -> print A
as soon as received B -> print B
as soon as received C -> print C
as soon as received stop from Sender -> exit
}
}
Actor StopNotifier
Purely active behavior {
compute, and when some condition is met -> send stop to Sender
}
我的问题是:对于需要集成自主性和 react 性的 Scala Actor 来说,表达自主行为的最佳方式是什么(如 in this paper 所述)?
换句话说,在上面的例子中编码 Sender actor 的最佳方式/风格是什么?
我想出了一个解决方案(下面报告),但由于我不是 scala 大师(还 :))我想知道我实现的内容是否可以在更好/更好的解决方案中得到改进。
case object START
case object A
case object B
case object C
case object SENT_A
case object SENT_B
case object ACK_A
case object ACK_B
case object ACK_C
case object STOP
class Sender(stdout: Stdout) extends Actor {
def act() {
self!START
while (true){
receive {
case START =>
stdout!?A
self!SENT_A
case SENT_A =>
stdout!?B
self!SENT_B
case SENT_B =>
stdout!?C
stdout!?STOP
exit()
case STOP => {
Console.println("[Sender:]Received STOP, terminating")
stdout!?STOP
exit()
}
}
}
}
}
class Stdout() extends Actor {
def act() {
while (true) {
receive{
case A =>
Console.println("[Stdout:]A")
reply(ACK_A)
case B =>
Console.println("[Stdout:]B")
reply(ACK_B)
case C =>
Console.println("[Stdout:]C")
reply(ACK_C)
exit()
case STOP =>
Console.println("[Stdout:]Received STOP, terminating")
exit()
}
}
}
}
class StopNotifier(sender: Sender) extends Actor {
def act() {
/*
* The idea is that the StopNotifier should send a STOP message to the Sender
* when a certain condition is met.
* The sleep used here is just a semplification, since the detection of such
* a condition is not relevant for the example.
*/
Thread.sleep(200)
Console.println("[StopNotifier:]Sending STOP to sender")
sender ! STOP
exit()
}
}
object app extends Application {
val stdout = new Stdout
stdout.start
val sender = new Sender(stdout)
sender.start
val stopNotifier = new StopNotifier(sender)
stopNotifier.start
}
在我当前的实现中特别困扰我的是,为了能够对从 StopNotifier 接收到的 STOP 消息迅速使用react,我需要在 Sender 的每个执行步骤中自行发送消息。 (即在将 A、B 发送给 Stdout Actor 之后)。在我看来,做这些事情的正确方式太棘手了:)。
我还尝试使用其他 Scala 语言结构(例如异步发送、 react 等)开发其他解决方案,但在我看来,它们似乎受到其他问题/技巧的影响。
有没有人有更好的解决方案来处理 Scala 参与者中自主性和 react 性行为的整合?
最佳答案
如果我理解正确,您应该使用 Akka actor,特别是 Akka FSM,将发送方建模为状态机。 Akka actor 有一个内置的停止机制,或者你总是可以使用你自己的消息,这些消息可以通过 whenUnhandled
从所有状态处理。处理程序。
见 http://doc.akka.io/docs/akka/snapshot/scala/fsm.html
这显然是矫枉过正,但我认为您正在尝试做一些更复杂的事情。您也可以拥有 Stdout
“ watch ”Sender
以便它在 Sender
时终止终止而不是在收到特定消息时终止。见 Lifecycle Monitoring aka DeathWatch .
package fsmTest
import akka.actor._
import akka.util.duration._
sealed trait Msg
case object A extends Msg
case object B extends Msg
case object C extends Msg
sealed trait SenderState
case object Started extends SenderState
case object SentA extends SenderState
case object SentB extends SenderState
case class SenderData()
class Sender(stdout: ActorRef)
extends Actor
with FSM[SenderState, SenderData] {
case object GoNextState
startWith(Started, SenderData())
when(Started) {
case Event(GoNextState, data) => {
stdout ! A
goto(SentA) using data
}
}
when(SentA) {
case Event(GoNextState, data) => {
stdout ! B
goto(SentB) using data
}
}
when(SentB) {
case Event(GoNextState, data) => {
stdout ! C
goto(Started) using data
}
}
// //Handle messages which aren't explicitly handled in state here
// whenUnhandled {
// case Event(SomeCustomStop, data) => {
// stop(FSM.Shutdown)
// }
// }
setTimer("goNextState", GoNextState, 1 second, repeat = true)
initialize
}
class Stdout() extends Actor {
def receive = {
case msg: Msg => {
context.watch(sender) //Not sure if you're gonna want to do this here, but you get the point
println(msg)
}
case Terminated(_) => context.stop(self)
}
}
object FSMTest extends App {
implicit val system = ActorSystem("Testing")
val stdout = system.actorOf(Props[Stdout], "stdout")
val sender = system.actorOf(Props(new Sender(stdout)), "sender")
system.scheduler.scheduleOnce(5 seconds) {
system.stop(sender)
system.shutdown()
}
system.awaitTermination(10 seconds)
}
无论您如何在发送方中实现状态,如果您想使用 Actors 对其进行建模,我相信您将需要“自我发送”消息,无论是在事件处理中还是在我上面使用的计时器中。
关于multithreading - 在 Scala Actor 中集成自主和 react 行为的最佳方式?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11416673/