考虑经典的“字数统计”程序。它计算某个目录中所有文件中的单词数。 Master 接收一些目录并在 Worker Actor 之间拆分作业(每个 Worker 使用一个文件)。这是伪代码:
class WordCountWorker extends Actor {
def receive = {
case FileToCount(fileName:String) =>
val count = countWords(fileName)
sender ! WordCount(fileName, count)
}
}
class WordCountMaster extends Actor {
def receive = {
case StartCounting(docRoot) => // sending each file to worker
val workers = createWorkers()
fileNames = scanFiles(docRoot)
sendToWorkers(fileNames, workers)
case WordCount(fileName, count) => // aggregating results
...
}
}
但我想按计划(例如每 1 分钟)运行这个 Word Count 程序,提供不同的目录进行扫描。
Akka 为调度消息传递提供了很好的方法:
system.scheduler.schedule(0.seconds, 1.minute, wordCountMaster , StartCounting(directoryName))
但是上述调度程序的问题是在调度程序通过tick发送新消息时开始的,但之前的消息尚未处理(例如我发送消息扫描某个大目录,1秒后我发送另一条消息扫描另一个目录,所以操作第一个目录的处理尚未完成)。所以结果我的
WordCountMaster
将收到WordCount
来自处理不同目录的工作人员的消息。作为一种解决方法,而不是安排消息发送,我可以安排一些代码块的执行,每次都会创建新的
WordCountMaster
. IE。一个目录 = 一个 WordCountMaster
.但我认为它效率低下,而且我需要注意为 WordCountMaster
提供唯一名称避免InvalidActorNameException
.所以我的问题是:我应该创建新的
WordCountMaster
对于我在上面段落中提到的每个刻度?或者有一些更好的想法/模式如何重新设计这个程序以支持调度?一些更新:
如果为每个目录创建一个主 Actor ,我会遇到一些问题:
InvalidActorNameException: actor name [WordCountMaster] is not unique!
和
InvalidActorNameException: actor name [WordCountWorker ] is not unique!
我可以克服这个问题,只是不提供 Actor 姓名。但在这种情况下,我的 Actor 会收到自动生成的名称,例如
$a
, $b
等等。这对我不好。我想将我的路由器配置排除到
application.conf
. IE。我想为每个 WordCountWorker
提供相同的配置路由器。但由于我不控制 Actor 姓名,我不能使用下面的配置,因为我不知道 Actor 姓名: /wordCountWorker{
router = smallest-mailbox-pool
nr-of-instances = 5
dispatcher = word-counter-dispatcher
}
最佳答案
我不是 Akka 专家,但我认为每个聚合都有一个 Actor 的方法并不是低效的。您需要以某种方式保持并发聚合分离。您可以给每个聚合一个 id,以便在唯一的主 actor 中将它们用 id 分隔,或者您可以使用 Akka actor 命名和生命周期逻辑,并将每个计数轮的每个聚合委托(delegate)给将存活的 actor只是为了那个聚合逻辑。
对我来说,每个聚合使用一个 Actor 似乎更优雅。
另请注意,Akka 有一个聚合模式的实现,如 here 所述。
关于scala - Akka 调度模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30446507/