mongodb - 无法将数组从 MongoDB 传递到需要向量的 Spark 机器学习函数

标签 mongodb scala apache-spark machine-learning

我的用例:

从以下形式的 MongoDB 集合中读取数据:

{
    "_id" : ObjectId("582cab1b21650fc72055246d"),
    "label" : 167.517838916715,
    "features" : [ 
        10.0964787450654, 
        218.621137772497, 
        18.8833848806122, 
        11.8010251302327, 
        1.67037687829152, 
        22.0766170950477, 
        11.7122322171201, 
        12.8014773524475, 
        8.30441804118235, 
        29.4821268054137
    ]
}

并将其传递给 org.apache.spark.ml.regression.LinearRegression 类以创建预测模型。

我的问题:

Spark 连接器将“features”读取为 Array[Double]。
LinearRegression.fit(...) 需要一个带有标签列和特征列的数据集。
Features 列必须是 VectorUDT 类型(因此 DenseVector 或 SparseVector 可以工作)。
我无法将功能从 Array[Double] 映射到 DenseVector,因为没有相关的编码器:

Error:(23, 11) Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
  .map{case Row(label: Double, features: Array[Double]) => Row(label, Vectors.dense(features))}

无法定义自定义编码器。

我的问题:

  • 有没有办法可以将 Spark 连接器的配置设置为 将“features”数组作为 Dense/SparseVector 读取?
  • 有没有 我可以通过其他方式实现这一点(例如,不使用 中间 .csv 文件并使用 libsvm 加载该文件)?

我的代码:

import com.mongodb.spark.MongoSpark
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.sql.{Row, SparkSession}

case class DataPoint(label: Double, features: Array[Double])

object LinearRegressionWithMongo {
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("LinearRegressionWithMongo")
      .master("local[4]")
      .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/LinearRegressionTest.DataPoints")
      .getOrCreate()

    import spark.implicits._

    val dataPoints = MongoSpark.load(spark)
      .map{case Row(label: Double, features: Array[Double]) => Row(label, Vectors.dense(features))}

    val splitData = dataPoints.randomSplit(Array(0.7, 0.3), 42)
    val training = splitData(0)
    val test = splitData(1)

    val linearRegression = new LinearRegression()
      .setLabelCol("label")
      .setFeaturesCol("features")
      .setRegParam(0.0)
      .setElasticNetParam(0.0)
      .setMaxIter(100)
      .setTol(1e-6)

    // Train the model
    val startTime = System.nanoTime()
    val linearRegressionModel = linearRegression.fit(training)
    val elapsedTime = (System.nanoTime() - startTime) / 1e9
    println(s"Training time: $elapsedTime seconds")

    // Print the weights and intercept for linear regression.
    println(s"Weights: ${linearRegressionModel.coefficients} Intercept: ${linearRegressionModel.intercept}")

    val modelEvaluator = new ModelEvaluator()
    println("Training data results:")
    modelEvaluator.evaluateRegressionModel(linearRegressionModel, training, "label")
    println("Test data results:")
    modelEvaluator.evaluateRegressionModel(linearRegressionModel, test, "label")

    spark.stop()
  }
}

任何帮助将不胜感激!

最佳答案

对此有一个快速解决方案。如果数据已加载到 DataFramedf其中有:

  • id - SQL double .
  • features - SQL array<double> .

喜欢这个

val df = Seq((1.0, Array(2.3, 3.4, 4.5))).toDF("id", "features")

select下游处理所需的色谱柱:

val idAndFeatures = df.select("id", "features")

转换为静态类型 Dataset :

val tuples = idAndFeatures.as[(Double, Seq[Double])]

map并转换回Dataset[Row] :

val spark: SparkSession = ???

import spark.implicits._
import org.apache.spark.ml.linalg.Vectors

tuples.map { case (id, features) => 
  (id, Vectors.dense(features.toArray))
}.toDF("id", "features")

您可以找到详细说明与您当前的方法相比有什么区别 here .

关于mongodb - 无法将数组从 MongoDB 传递到需要向量的 Spark 机器学习函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40641590/

相关文章:

java - 使用 Statistic.stat 时如何避免收集

apache-spark - 如何强制 Spark SQL 进入代码生成模式?

collections - 强制删除 MongoDB 中的集合

javascript - 如何在 Node ,expressjs中的 Elasticsearch 中从 Mongoose 创建索引

Scala - 如何创建匿名类并避免隐藏参数名称

scala - Scala 中的 Monad Transformer 堆栈

apache-spark - 如何计算日期范围内每个月的天数

javascript - 获取 mongodb 查询中项目的索引

mongodb - 看不到关系 - Loopback 4 + MongoDB + openapi-to-graphql

Scala View 过滤器不懒惰?