scala - Akka 调度模式

标签 scala akka

考虑经典的“字数统计”程序。它计算某个目录中所有文件中的单词数。 Master 接收一些目录并在 Worker Actor 之间拆分作业(每个 Worker 使用一个文件)。这是伪代码:

class WordCountWorker extends Actor {

  def receive = {
    case FileToCount(fileName:String) =>
      val count = countWords(fileName)
      sender ! WordCount(fileName, count)
  }
}

class WordCountMaster extends Actor {
  def receive = {
    case StartCounting(docRoot) => // sending each file to worker
      val workers = createWorkers()
      fileNames = scanFiles(docRoot)
      sendToWorkers(fileNames, workers)
    case WordCount(fileName, count) => // aggregating results
      ...

  }
}

但我想按计划(例如每 1 分钟)运行这个 Word Count 程序,提供不同的目录进行扫描。

Akka 为调度消息传递提供了很好的方法:
system.scheduler.schedule(0.seconds, 1.minute, wordCountMaster , StartCounting(directoryName))

但是上述调度程序的问题是在调度程序通过tick发送新消息时开始的,但之前的消息尚未处理(例如我发送消息扫描某个大目录,1秒后我发送另一条消息扫描另一个目录,所以操作第一个目录的处理尚未完成)。所以结果我的WordCountMaster将收到WordCount来自处理不同目录的工作人员的消息。

作为一种解决方法,而不是安排消息发送,我可以安排一些代码块的执行,每次都会创建新的 WordCountMaster . IE。一个目录 = 一个 WordCountMaster .但我认为它效率低下,而且我需要注意为 WordCountMaster 提供唯一名称避免InvalidActorNameException .

所以我的问题是:我应该创建新的 WordCountMaster对于我在上面段落中提到的每个刻度?或者有一些更好的想法/模式如何重新设计这个程序以支持调度?

一些更新:
如果为每个目录创建一个主 Actor ,我会遇到一些问题:
  • Actor 命名问题

  • InvalidActorNameException: actor name [WordCountMaster] is not unique!





    InvalidActorNameException: actor name [WordCountWorker ] is not unique!



    我可以克服这个问题,只是不提供 Actor 姓名。但在这种情况下,我的 Actor 会收到自动生成的名称,例如 $a , $b等等。这对我不好。
  • 配置问题:

  • 我想将我的路由器配置排除到 application.conf . IE。我想为每个 WordCountWorker 提供相同的配置路由器。但由于我不控制 Actor 姓名,我不能使用下面的配置,因为我不知道 Actor 姓名:
      /wordCountWorker{
        router = smallest-mailbox-pool
        nr-of-instances = 5
        dispatcher = word-counter-dispatcher
      }
    

    最佳答案

    我不是 Akka 专家,但我认为每个聚合都有一个 Actor 的方法并不是低效的。您需要以某种方式保持并发聚合分离。您可以给每个聚合一个 id,以便在唯一的主 actor 中将它们用 id 分隔,或者您可以使用 Akka actor 命名和生命周期逻辑,并将每个计数轮的每个聚合委托(delegate)给将存活的 actor只是为了那个聚合逻辑。

    对我来说,每个聚合使用一个 Actor 似乎更优雅。

    另请注意,Akka 有一个聚合模式的实现,如 here 所述。

    关于scala - Akka 调度模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30446507/

    相关文章:

    Scala 编译器表示 () 为 "error: identifier expected but integer literal found."而不是 {}

    scala - 如何在Scala中将类方法作为参数传递

    java - 使用 Akka Streams 询问、告诉或转发 Actor

    scala - Akka HTTP 2.0 使用 SSL (HTTPS)

    scala - 我的Akka logback.xml有什么问题?

    java - 在 Play 中动态设置 CompletionStages 的线程池大小!与 Akka

    java - 毫无意义的基本 Scala 错误

    scala - 有人使用securesocial实现了死锁或任何其他授权机制吗?

    scala - sbt 编译需要很长时间才能完成

    scala - 是否可以通过历史编号在 spark-shell 中重新运行命令?