scala - Spark - 从 CSV 文件创建(标签、特征)对的 RDD

标签 scala apache-spark linear-regression

我有一个 CSV 文件,想要对数据执行简单的 LinearRegressionWithSGD。

示例数据如下(文件中的总行数为 99 行,包括标签),目标是预测 y_3 变量:

y_3,x_6,x_7,x_73_1,x_73_2,x_73_3,x_8
2995.3846153846152,17.0,1800.0,0.0,1.0,0.0,12.0
2236.304347826087,17.0,1432.0,1.0,0.0,0.0,12.0
2001.9512195121952,35.0,1432.0,0.0,1.0,0.0,5.0
992.4324324324324,17.0,1430.0,1.0,0.0,0.0,12.0
4386.666666666667,26.0,1430.0,0.0,0.0,1.0,25.0
1335.9036144578313,17.0,1432.0,0.0,1.0,0.0,5.0
1097.560975609756,17.0,1100.0,0.0,1.0,0.0,5.0
3526.6666666666665,26.0,1432.0,0.0,1.0,0.0,12.0
506.8421052631579,17.0,1430.0,1.0,0.0,0.0,5.0
2095.890410958904,35.0,1430.0,1.0,0.0,0.0,12.0
720.0,35.0,1430.0,1.0,0.0,0.0,5.0
2416.5,17.0,1432.0,0.0,0.0,1.0,12.0
3306.6666666666665,35.0,1800.0,0.0,0.0,1.0,12.0
6105.974025974026,35.0,1800.0,1.0,0.0,0.0,25.0
1400.4624277456646,35.0,1800.0,1.0,0.0,0.0,5.0
1414.5454545454545,26.0,1430.0,1.0,0.0,0.0,12.0
5204.68085106383,26.0,1800.0,0.0,0.0,1.0,25.0
1812.2222222222222,17.0,1800.0,1.0,0.0,0.0,12.0
2763.5928143712576,35.0,1100.0,1.0,0.0,0.0,12.0

我已经使用以下命令读取了数据:

val data = sc.textFile(datadir + "/data_2.csv");

当我想使用以下命令创建(标签,特征)对的 RDD 时:

val parsedData = data.map { line =>
    val parts = line.split(',')
    LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
    }.cache()

indicated error in the output

所以我无法继续训练模型,有什么帮助吗?

附注我在 Windows 7 x64 中使用 Scala IDE 运行 Spark。

最佳答案

经过一番努力,我找到了解决方案。第一个问题与标题行有关,第二个问题与映射功能有关。这是完整的解决方案:

//To read the file
val csv = sc.textFile(datadir + "/data_2.csv");

//To find the headers
val header = csv.first;

//To remove the header
val data = csv.filter(_(0) != header(0));

//To create a RDD of (label, features) pairs
val parsedData = data.map { line =>
    val parts = line.split(',')
    LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
    }.cache()

希望它可以节省您的时间。

关于scala - Spark - 从 CSV 文件创建(标签、特征)对的 RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30298523/

相关文章:

scala - 如何使用 Scalatra 更改 AsyncResult 的 HTTP 状态代码

scala - UTF 8 字符的 Base64 解码并打印

java - Spark Java语言加载数据集的最佳方式

python - Spark : Distribute low number of compute-intensive tasks via UDF

python - 如何将 Databricks Spark-csv 包添加到 IBM Bluemix 上的 Python Jupyter 笔记本

r - 使用 geom_smooth 添加回归线以在 R 中使用离散 x 轴绘制

java - 如何在 Java 代码中等待 scala Future

scala - 在 Scala 中查找自由端口的优雅方法

machine-learning - 在在线机器学习算法线性回归随机梯度中,当新的训练数据到来时,我们是否必须将其与以前的数据混合?

python - 使用 linalg.lstsq 进行 numpy 多元回归