apache-spark - Pyspark pyspark.rdd.PipelinedRDD 不适用于模型

标签 apache-spark pyspark apache-spark-sql

我无法将 RDD 对象传递给 PySpark 逻辑回归模型。我正在使用 Spark 2.0.1。任何帮助将不胜感激..

>>> from pyspark import SparkContext, HiveContext
>>> from pyspark.mllib.regression import LabeledPoint
>>> from pyspark.mllib.classification import LogisticRegressionWithLBFGS
>>> from pyspark.mllib.util import MLUtils
>>>
>>> table_name = "api_model"
>>> target_col = "dv"
>>>
>>>
>>> hc = HiveContext(sc)
>>>
>>> # get the table from the hive context
... df = hc.table(table_name)
>>> df = df.select(target_col, *[col for col in df.columns if col != target_col])
>>>
>>> # map through the data to produce an rdd of labeled points
... rdd_of_labeled_points = df.rdd.map(lambda row: LabeledPoint(row[0], row[1:]))
>>> print (rdd_of_labeled_points.take(3))
[LabeledPoint(1.0, [0.0,2.520784472,0.0,0.0,0.0,2.004684436,2.000347299,0.0,2.228387043,2.228387043,0.0,0.0,0.0,0.0,0.0,0.0]), LabeledPoint(0.0, [2.857738033,0.0,0.0,2.619965104,0.0,2.004684436,2.000347299,0.0,2.228387043,2.228387043,0.0,0.0,0.0,0.0,0.0,0.0]), LabeledPoint(0.0, [2.857738033,0.0,2.061393767,0.0,0.0,2.004684436,0.0,0.0,2.228387043,2.228387043,0.0,0.0,0.0,0.0,0.0,0.0])]
>>>
>>> from pyspark.ml.classification import LogisticRegression
>>> lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
>>> lrModel = lr.fit(sc.parallelize(rdd_of_labeled_points))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 432, in parallelize
    c = list(c)    # Make it a list so we can compute its length
TypeError: 'PipelinedRDD' object is not iterable

最佳答案

那是因为您在 RDD 上使用 sc.parallelize。以下是错误:

sc.parallelize(rdd_of_labeled_points)

您还混合了 spark-mlspark-mllib :

from pyspark.mllib.classification import LogisticRegressionWithLBFGS

from pyspark.ml.classification import LogisticRegression

lrModel = lr.fit(sc.parallelize(rdd_of_labeled_points))

在第一种情况下,您需要使用 RDD 来训练模型,如我上面所述,例如:

model = LinearRegressionWithSGD.train(rdd_of_labeled_points, iterations=100, step=0.00000001)

在第二种情况下,您需要将 RDD 转换为 DataFrame 以将其提供给您的模型。

强烈建议您阅读官方文档。还有大量示例可以帮助您入门。

记住:

  • spark-mllib 使用 RDD。
  • spark-mll 使用 DataFrame。

关于apache-spark - Pyspark pyspark.rdd.PipelinedRDD 不适用于模型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44405675/

相关文章:

mysql - 未找到 Apache Spark Mysql 连接合适的 jdbc 驱动程序

python - 如何在 pyspark 中并行下载大量 URL 列表?

python - PySpark 数值窗口分组依据

java - 如何在不使用 Spark SQL 的情况下对 Spark 中的数据帧进行排序?

scala - 创建RDD时spark报错RDD类型未找到

hadoop - 物联网设备的分析引擎

scala - 如何拆分 ML 管道逻辑回归产生的预测概率

apache-spark - Spark : Running Backwards Elimination By P-Value With Linear Regressions

python - pickle .PicklingError : args[0] from __newobj__ args has the wrong class with hadoop python

scala - 如何按多列过滤数据框?