scala - Spark 流的迭代算法

标签 scala iteration apache-spark dstream

因此我了解到 Spark 可以在单个 RDD 上执行迭代算法,例如逻辑回归。

    val points = spark.textFile(...).map(parsePoint).cache()
    var w = Vector.random(D) // current separating plane
    for (i <- 1 to ITERATIONS) {
      val gradient = points.map(p =>
        (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x
      ).reduce(_ + _)
      w -= gradient
    }

上面的示例是迭代的,因为它维护一个全局状态 w,该状态在每次迭代后更新,并且其更新值将在下一次迭代中使用。 Spark 流中可以使用此功能吗?考虑相同的示例,只不过现在 points 是一个 DStream。在这种情况下,您可以创建一个新的 DStream 来计算梯度

val gradient = points.map(p =>
            (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x
          ).reduce(_ + _)

但是你将如何处理全局状态w。看起来 w 也必须是 DStream(也许使用 updateStateByKey),但随后它的最新值需要以某种方式传递到 point 中code> map 函数,我认为这是不可能的。我认为 DStreams 不能以这种方式进行通信。我是否正确,或者是否可以在 Spark Streaming 中进行这样的迭代计算?

最佳答案

我刚刚发现使用 foreachRDD 函数非常简单。 MLlib 实际上提供了可以使用 DStreams 训练的模型,我在 streamingLinearAlgorithm 中找到了答案。代码。看起来您可以将全局更新变量本地保存在驱动程序中并在 .foreachRDD 中更新它,因此实际上不需要将其转换为 DStream 本身。所以你可以将其应用到我提供的示例中,例如

points.foreachRDD{(rdd,time) =>

     val gradient=rdd.map(p=> (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x
     )).reduce(_ + _)

  w -= gradient

  }

关于scala - Spark 流的迭代算法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29063392/

相关文章:

scala - 如何在Scala中模拟 “assign-once” var?

scala - 根据参数双向循环 Scala

java - 映射器函数中的静态变量值未更改

mysql - 在 MySQL 中循环遍历枚举

c# - 将嵌套的 for 循环转换为单个 LINQ 语句

apache-spark - 断言错误 : assertion failed: No plan for DeleteFromTable In Databricks

java - 我可以在 Play Framework 2 中使用 specs2 测试 Java Controller 吗?

scala - 在 Scala 中将选项转换为任一

apache-spark - Spark 作业因 java.lang.ArrayIndexOutOfBoundsException : 1 而失败

scala - Spark 中 Stage 的详细信息