python - 将 rdd 的 numpy 数组转换为 pyspark 数据帧

标签 python numpy apache-spark pyspark rdd

尝试将由 numpy 数组组成的 rdd 转换为 pyspark 中的数据帧时出现以下错误:

下面是导致这个错误的代码片段,我什至不确定我能找到错误的实际位置,即使是阅读跟踪...

有谁知道如何绕过它?

非常感谢!

In [111]: rddUser.take(5)

Out[111]:

[array([u'1008798262000292538', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'], 
       dtype='<U32'),
 array([u'102254941859441333', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'], 
       dtype='<U32'),
 array([u'1035609083097069747', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'], 
       dtype='<U32'),
 array([u'10363297284472000', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'], 
       dtype='<U32'),
 array([u'1059178934871294116', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'], 
       dtype='<U32')]

然后就是乱七八糟的了:

In [110]: rddUser.toDF(schema=None).show()  

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-110-073037afd70e> in <module>()
----> 1 rddUser.toDF(schema=None).show()

     62         [Row(name=u'Alice', age=1)]
     63         """
---> 64         return sqlContext.createDataFrame(self, schema, sampleRatio)
     65 
     66     RDD.toDF = toDF

    421 
    422         if isinstance(data, RDD):
--> 423             rdd, schema = self._createFromRDD(data, schema, samplingRatio)
    424         else:
    425             rdd, schema = self._createFromLocal(data, schema)

    308         """
    309         if schema is None or isinstance(schema, (list, tuple)):
--> 310             struct = self._inferSchema(rdd, samplingRatio)
    311             converter = _create_converter(struct)
    312             rdd = rdd.map(converter)

    253         """
    254         first = rdd.first()
--> 255         if not first:
    256             raise ValueError("The first row in RDD is empty, "
    257                              "can not infer schema")

ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()

最佳答案

如果 RDD 被定义为 maptolist

import numpy as np

rdd = spark.sparkContext.parallelize([
    np.array([u'1059178934871294116', u'1.0', u'0.0', u'0.0', u'0.0', u'1.0']),
    np.array([u'102254941859441333', u'1.0', u'0.0', u'0.0', u'0.0', u'1.0'])
])

df = rdd.map(lambda x: x.tolist()).toDF(["user_id"])

# +-------------------+---+---+---+---+---+
# |            user_id| _2| _3| _4| _5| _6|
# +-------------------+---+---+---+---+---+
# |1059178934871294116|1.0|0.0|0.0|0.0|1.0|
# | 102254941859441333|1.0|0.0|0.0|0.0|1.0|
# +-------------------+---+---+---+---+---+

但考虑到您的评论,我假设您想将它与 ml 一起使用。那么这可能会更好:

from pyspark.ml.linalg import DenseVector

(rdd
   .map(lambda x: (x[0].tolist(), DenseVector(x[1:])))
   .toDF(["user_id", "features"])
   .show(2, False))
# +-------------------+---------------------+
# |user_id            |features             |
# +-------------------+---------------------+
# |1059178934871294116|[1.0,0.0,0.0,0.0,1.0]|
# |102254941859441333 |[1.0,0.0,0.0,0.0,1.0]|
# +-------------------+---------------------+

您还应该看看 pyspark.ml.feature.OneHotEncoder

关于python - 将 rdd 的 numpy 数组转换为 pyspark 数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48499139/

相关文章:

python - 将时间戳/日期时间更改为整数的 Numpy

python - NumPy/ Pandas : remove sequential duplicate values (equivalent of bash uniq without sort)

scala - 根据另一列将值映射到特定列

python - 在 Python 中查找小于或等于非循环有向图的给定值的最长路径

python - 安装 python 依赖项不起作用

python - Django shell 编码错误(仅限 Debian,Ubuntu 没问题)

python - 将排序从 Python2 转换为 Python3

python - 检测多个 numpy 二维数组中的第一个唯一行

python - 在pySpark中处理空数组(可选的二进制元素(UTF8)不是一个组)

java - 如何在 Apache Spark 中使用 PathFilter?