因此我了解到 Spark 可以在单个 RDD 上执行迭代算法,例如逻辑回归。
val points = spark.textFile(...).map(parsePoint).cache()
var w = Vector.random(D) // current separating plane
for (i <- 1 to ITERATIONS) {
val gradient = points.map(p =>
(1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x
).reduce(_ + _)
w -= gradient
}
上面的示例是迭代的,因为它维护一个全局状态 w
,该状态在每次迭代后更新,并且其更新值将在下一次迭代中使用。 Spark 流中可以使用此功能吗?考虑相同的示例,只不过现在 points
是一个 DStream。在这种情况下,您可以创建一个新的 DStream 来计算梯度
val gradient = points.map(p =>
(1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x
).reduce(_ + _)
但是你将如何处理全局状态w
。看起来 w
也必须是 DStream(也许使用 updateStateByKey
),但随后它的最新值需要以某种方式传递到 point
中code> map 函数,我认为这是不可能的。我认为 DStreams 不能以这种方式进行通信。我是否正确,或者是否可以在 Spark Streaming 中进行这样的迭代计算?
最佳答案
我刚刚发现使用 foreachRDD 函数非常简单。 MLlib 实际上提供了可以使用 DStreams 训练的模型,我在 streamingLinearAlgorithm 中找到了答案。代码。看起来您可以将全局更新变量本地保存在驱动程序中并在 .foreachRDD 中更新它,因此实际上不需要将其转换为 DStream 本身。所以你可以将其应用到我提供的示例中,例如
points.foreachRDD{(rdd,time) =>
val gradient=rdd.map(p=> (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x
)).reduce(_ + _)
w -= gradient
}
关于scala - Spark 流的迭代算法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29063392/