scala - 控制 Akka 中消耗大量内存的 actor 的生成

标签 scala concurrency machine-learning akka

我使用 akka 的 actor 模型构建了一个分布式流机器学习模型。通过向 Actor 发送训练实例(训练数据)来异步训练模型。对这些数据的训练会占用计算时间并改变参与者的状态。

目前我正在使用历史数据来训练模型。我想运行一堆不同配置的模型,这些模型在相同的数据上进行训练,并查看不同的集成指标有何不同。本质上,这是对 Thread.sleep(1) 和表示计算时间和状态的数据数组进行的操作的简单得多的模拟。

implicit val as = ActorSystem()

case object Report

case class Model(dataSize: Int) {
  val modelActor: ActorRef = actor(new Act {
    val data = Array.fill(dataSize)(0)
    become {
      case trainingData: Int => {
        // Screw with the state of the actor and pretend that it takes time
        Thread.sleep(1)
        data(Math.abs(Random.nextInt % dataSize)) == trainingData
      }
      case Report => {
          println(s"Finished $dataSize")
          context.stop(self)
        }
      }
    })

  def train(trainingInstance: Int) = modelActor ! trainingInstance

  def report: Unit = modelActor ! Report
}

val trainingData = Array.fill(5000)(Random.nextInt)

val dataSizeParams = (1 to 500)

接下来我使用 for 循环来改变参数(由 dataSizeParams 数组表示)

for {
  param <- dataSizeParams
} {
  // make model with params
  val model = Model(param)
  for {
    trainingInstance <- trainingData
  } {
    model.train(trainingInstance)
  }
  model.report
}

for 循环绝对是我想做的事情的错误方式。它并行启动所有不同的模型。当 dataSizeParams 在 1 到 500 范围内时它效果很好,但是如果我将其提高到较高的值,我的模型每个都会开始占用明显的内存块。我想出的是下面的代码。本质上,我有一个模型大师,他可以根据他收到的运行消息的数量来控制同时运行的模型数量。现在,每个模型都包含对此主参与者的引用,并在处理完成后向他发送一条消息:

// Alternative that doesn't use a for loop and instead controls concurrency through what I'm calling a master actor
case object ImDone
case object Run

case class Model(dataSize: Int, master: ActorRef) {
  val modelActor: ActorRef = actor(new Act {
    val data = Array.fill(dataSize)(0)
    become {
      case trainingData: Int => {
        // Screw with the state of the actor and pretend that it takes time
        Tread.sleep(1)
        data(Math.abs(Random.nextInt % dataSize)) == trainingData
      }
      case Report => {
          println(s"Finished $dataSize")
          master ! ImDone
          context.stop(self)
        }
      }
    })

  def train(trainingInstance: Int) = modelActor ! trainingInstance

  def report: Unit = modelActor ! Report
}

val master: ActorRef = actor(new Act {
  var paramRuns = dataSizeParams.toIterator
  become {
    case Run => {
      if (paramRuns.hasNext) {
        val model = Model(paramRuns.next(), self)
        for {
          trainingInstance <- trainingData
        } {
          model.train(trainingInstance)
        }
        model.report
      } else {
        println("No more to run")
        context.stop(self)
      }
    }
    case ImDone =>  {
      self ! Run
    }
  }
})

master ! Run

主代码没有任何问题(我可以看到)。我可以严格控制一次生成的模型数量,但我觉得我缺少一种更简单/干净/开箱即用的方法来做到这一点。另外,我想知道是否有任何巧妙的方法来限制同时运行的模型数量,例如查看系统的 CPU 和内存使用情况。

最佳答案

您正在寻找工作拉动模式。我强烈推荐 Akka 开发人员撰写的这篇博文:

http://letitcrash.com/post/29044669086/balancing-workload-across-nodes-with-akka-2

我们在 Akka 的集群功能之上使用了它的变体,以避免恶意并发。通过让工作参与者工作而不是让主管工作,您可以通过简单地限制数量来优雅地控制工作量(以及CPU和内存使用量) worker Actor 。

与纯路由器相比,这有一些优点:更容易跟踪故障(如该帖子中所述),并且工作不会在邮箱中滞留(可能会丢失)。

此外,如果您使用远程处理,我建议您不要在消息中发送大量数据。让工作节点在触发时自行从另一个源提取数据。我们使用 S3。

关于scala - 控制 Akka 中消耗大量内存的 actor 的生成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21054434/

相关文章:

postgresql - CREATE SCHEMA IF NOT EXISTS 引发重复键错误

java - 信号量 : Permit acquired in one thread can be released from another thread - Example

python - 修改pandas中的列类型

scala - 在编译时强制 HList 类型的上限

jquery - Scala.js:选择和操作生成的 SVG

scala - 为什么 Scala 中没有 i++?

scala - SBT:我可以动态添加基于另一个库的依赖项吗?

c - Atom.io 缺少 sys/wait.h 或允许 fork()

python - 多元线性回归成本太高

machine-learning - CNN处理图片时如何计算参数个数