我的用例:
从以下形式的 MongoDB 集合中读取数据:
{
"_id" : ObjectId("582cab1b21650fc72055246d"),
"label" : 167.517838916715,
"features" : [
10.0964787450654,
218.621137772497,
18.8833848806122,
11.8010251302327,
1.67037687829152,
22.0766170950477,
11.7122322171201,
12.8014773524475,
8.30441804118235,
29.4821268054137
]
}
并将其传递给 org.apache.spark.ml.regression.LinearRegression 类以创建预测模型。
我的问题:
Spark 连接器将“features”读取为 Array[Double]。
LinearRegression.fit(...) 需要一个带有标签列和特征列的数据集。
Features 列必须是 VectorUDT 类型(因此 DenseVector 或 SparseVector 可以工作)。
我无法将功能从 Array[Double] 映射到 DenseVector,因为没有相关的编码器:
Error:(23, 11) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
.map{case Row(label: Double, features: Array[Double]) => Row(label, Vectors.dense(features))}
无法定义自定义编码器。
我的问题:
- 有没有办法可以将 Spark 连接器的配置设置为 将“features”数组作为 Dense/SparseVector 读取?
- 有没有 我可以通过其他方式实现这一点(例如,不使用 中间 .csv 文件并使用 libsvm 加载该文件)?
我的代码:
import com.mongodb.spark.MongoSpark
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.sql.{Row, SparkSession}
case class DataPoint(label: Double, features: Array[Double])
object LinearRegressionWithMongo {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("LinearRegressionWithMongo")
.master("local[4]")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/LinearRegressionTest.DataPoints")
.getOrCreate()
import spark.implicits._
val dataPoints = MongoSpark.load(spark)
.map{case Row(label: Double, features: Array[Double]) => Row(label, Vectors.dense(features))}
val splitData = dataPoints.randomSplit(Array(0.7, 0.3), 42)
val training = splitData(0)
val test = splitData(1)
val linearRegression = new LinearRegression()
.setLabelCol("label")
.setFeaturesCol("features")
.setRegParam(0.0)
.setElasticNetParam(0.0)
.setMaxIter(100)
.setTol(1e-6)
// Train the model
val startTime = System.nanoTime()
val linearRegressionModel = linearRegression.fit(training)
val elapsedTime = (System.nanoTime() - startTime) / 1e9
println(s"Training time: $elapsedTime seconds")
// Print the weights and intercept for linear regression.
println(s"Weights: ${linearRegressionModel.coefficients} Intercept: ${linearRegressionModel.intercept}")
val modelEvaluator = new ModelEvaluator()
println("Training data results:")
modelEvaluator.evaluateRegressionModel(linearRegressionModel, training, "label")
println("Test data results:")
modelEvaluator.evaluateRegressionModel(linearRegressionModel, test, "label")
spark.stop()
}
}
任何帮助将不胜感激!
最佳答案
对此有一个快速解决方案。如果数据已加载到 DataFrame
叫df
其中有:
-
id
- SQLdouble
. -
features
- SQLarray<double>
.
喜欢这个
val df = Seq((1.0, Array(2.3, 3.4, 4.5))).toDF("id", "features")
你select
下游处理所需的色谱柱:
val idAndFeatures = df.select("id", "features")
转换为静态类型 Dataset
:
val tuples = idAndFeatures.as[(Double, Seq[Double])]
map
并转换回Dataset[Row]
:
val spark: SparkSession = ???
import spark.implicits._
import org.apache.spark.ml.linalg.Vectors
tuples.map { case (id, features) =>
(id, Vectors.dense(features.toArray))
}.toDF("id", "features")
您可以找到详细说明与您当前的方法相比有什么区别 here .
关于mongodb - 无法将数组从 MongoDB 传递到需要向量的 Spark 机器学习函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40641590/