multithreading - akka actor 之间的工作负载平衡

标签 multithreading scala web-crawler akka actor

我有2个akka actor用于抓取链接,即查找页面X中的所有链接,然后查找从X链接的所有页面中的所有链接,等等...

我希望他们或多或少以相同的速度进步,但往往其中一个会挨饿,而另一个会消耗所有资源。

我尝试过以下方法(简化)。 单页抓取由以下参与者完成:

class Crawler extends Actor {
  def receive = {
    case Crawl(url, kind) =>
      // download url
      // extract links
      sender ! Parsed(url, links, kind)
  }
}

方法 1:

class Coordinator extends Actor {
  val linksA = ...
  val linksB = ...
  def receive = {
    case Parsed(url, links, kind) =>
      val store = if (kind == kindA) linksA else linksB
      val newLinks = links -- store
      store ++= links
      newLinks.foreach { link =>
        val crawler = context.actorOf(Props[Crawler])
        crawler ! Crawl(link, kind)
      }
  }
}

方法 2:

class Coordinator extends Actor {
  val linksA = ...
  val linksB = ...
  val rrProps = Props[Crawler].withRouter(RoundRobinRouter(nrOfInstances = 10)
  val crawlerA = context.actorOf(rrProps)
  val crawlerB = context.actorOf(rrProps)
  def receive = {
    case Parsed(url, links, kind) =>
      val store = if (kind == kindA) linksA else linksB
      val newLinks = links -- store
      store ++= links
      newLinks.foreach { link =>
        if (kind == kindA) crawlerA ! Crawl(link, kind)
        else crawlerB ! Crawl(link, kind)
      }
  }
}

第二种方法使事情稍微好一点,但并没有完全解决问题。

有没有好的方法可以让两种爬虫以大致相同的速度前进?我应该在它们之间发送消息以依次解锁对方吗?

最佳答案

我正在开发一个类似的程序,其中工作人员的资源成本不均匀(在我的例子中,任务是执行数据库查询并将结果转储到另一个数据库中,但就像抓取不同的网站会有不同的成本一样,所以不同的查询也会有不同的成本)。我采用了两种处理此问题的方法:

  1. RoundRobinRouter 替换为 SmallestMailboxRouter
  2. 不要让协调器一次发送所有消息 - 而是分批发送它们,在您的情况下,您有十个工作人员,因此发送四十条消息应该会让他们一开始很忙。每当工作人员完成任务时,它都会向协调器发送一条消息,此时协调器会发出另一条消息,该消息可能会发送给刚刚完成其任务的工作人员。 (您也可以批量执行此操作,即在收到 n 个“任务完成”消息后,Coordinator 发送另一个 n 消息,但不要'不要让 n 太高,否则一些任务极短的工作人员可能会闲置。)

第三种选择是在所有参与者之间欺骗并共享一个ConcurrentLinkedQueue:填充队列后,Coordinator向工作人员发送一条“开始”消息,并且工作人员然后轮询队列直到它为空。

关于multithreading - akka actor 之间的工作负载平衡,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30605942/

相关文章:

python - NSWindow 拖动区域应该只在主线程上无效!这将在未来抛出异常

java - 如何同时(从不同的线程)将数据插入一个 Android SQLite 数据库中的不同表中?

scala - 函数式编程 setter

scala - 通过 sbt 添加 Joda-Time 时出现编译错误,但如果我通过 ide 添加它则运行正常?

c# - 网络爬虫超时

java - 了解加入()

scala - 更高程度的更高种类?

GitHub 存储库未在 Google 搜索中列出 - 无法提交 url

python - 使用转义的 ascii 字符串正确解析 html 页面

c++ - 对 std::vector 元素的赋值是线程安全的吗?