我有2个akka actor用于抓取链接,即查找页面X中的所有链接,然后查找从X链接的所有页面中的所有链接,等等...
我希望他们或多或少以相同的速度进步,但往往其中一个会挨饿,而另一个会消耗所有资源。
我尝试过以下方法(简化)。 单页抓取由以下参与者完成:
class Crawler extends Actor {
def receive = {
case Crawl(url, kind) =>
// download url
// extract links
sender ! Parsed(url, links, kind)
}
}
方法 1:
class Coordinator extends Actor {
val linksA = ...
val linksB = ...
def receive = {
case Parsed(url, links, kind) =>
val store = if (kind == kindA) linksA else linksB
val newLinks = links -- store
store ++= links
newLinks.foreach { link =>
val crawler = context.actorOf(Props[Crawler])
crawler ! Crawl(link, kind)
}
}
}
方法 2:
class Coordinator extends Actor {
val linksA = ...
val linksB = ...
val rrProps = Props[Crawler].withRouter(RoundRobinRouter(nrOfInstances = 10)
val crawlerA = context.actorOf(rrProps)
val crawlerB = context.actorOf(rrProps)
def receive = {
case Parsed(url, links, kind) =>
val store = if (kind == kindA) linksA else linksB
val newLinks = links -- store
store ++= links
newLinks.foreach { link =>
if (kind == kindA) crawlerA ! Crawl(link, kind)
else crawlerB ! Crawl(link, kind)
}
}
}
第二种方法使事情稍微好一点,但并没有完全解决问题。
有没有好的方法可以让两种爬虫以大致相同的速度前进?我应该在它们之间发送消息以依次解锁对方吗?
最佳答案
我正在开发一个类似的程序,其中工作人员的资源成本不均匀(在我的例子中,任务是执行数据库查询并将结果转储到另一个数据库中,但就像抓取不同的网站会有不同的成本一样,所以不同的查询也会有不同的成本)。我采用了两种处理此问题的方法:
- 将
RoundRobinRouter
替换为SmallestMailboxRouter
- 不要让
协调器
一次发送所有消息 - 而是分批发送它们,在您的情况下,您有十个工作人员,因此发送四十条消息应该会让他们一开始很忙。每当工作人员完成任务时,它都会向协调器发送一条消息,此时协调器会发出另一条消息,该消息可能会发送给刚刚完成其任务的工作人员。 (您也可以批量执行此操作,即在收到n
个“任务完成”消息后,Coordinator
发送另一个n
消息,但不要'不要让n
太高,否则一些任务极短的工作人员可能会闲置。)
第三种选择是在所有参与者之间欺骗并共享一个ConcurrentLinkedQueue
:填充队列后,Coordinator
向工作人员发送一条“开始”消息,并且工作人员然后轮询队列直到它为空。
关于multithreading - akka actor 之间的工作负载平衡,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30605942/