python - 无法将 RDD 转换为 DataFrame(RDD 有数百万行)

标签 python csv apache-spark pyspark

我正在使用 Apache Spark 1.6.2

我有一个 .csv 数据,它包含大约 800 万行,我想将其转换为 DataFrame

但我必须先将其转换为RDD才能进行映射以获取我想要的数据(列)

映射 RDD 工作正常,但是当涉及到将 RDD 转换为 DataFrame 时,Spark 会抛出错误

Traceback (most recent call last):
  File "C:/Users/Dzaky/Project/TJ-source/source/201512/final1.py", line 38, in <module>
    result_iso = input_iso.map(extract_iso).toDF()
  File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 64, in toDF
  File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 423, in createDataFrame
  File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 310, in _createFromRDD
  File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 254, in _inferSchema
  File "c:\spark\python\lib\pyspark.zip\pyspark\rdd.py", line 1315, in first
  File "c:\spark\python\lib\pyspark.zip\pyspark\rdd.py", line 1297, in take
  File "c:\spark\python\lib\pyspark.zip\pyspark\context.py", line 939, in runJob
  File "c:\spark\python\lib\py4j-0.9-src.zip\py4j\java_gateway.py", line 813, in __call__
  File "c:\spark\python\lib\pyspark.zip\pyspark\sql\utils.py", line 45, in deco
  File "c:\spark\python\lib\py4j-0.9-src.zip\py4j\protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.net.SocketException: Connection reset by peer: socket write error

这些是我的代码:

def extract_iso(line):
    fields = line.split(',')
    return [fields[-2], fields[1]]

input_iso = sc.textFile("data.csv")
result_iso = input_iso.map(extract_iso).toDF()

data.csv 有超过 800 万行,但是当我减去行直到它只有 < 500 行时,程序运行正常

我不知道 Spark 是否有行限制或其他问题,有什么方法可以转换我的 RDD 吗?

或者有没有其他方法可以像映射 RDD 一样映射 DataFrame?

Additional Information :

the data is messy, total columns in each row is oftenly different from one to another, that's why i need to map it first. But, the data that I want is always at the exact same index [1] and [-2] (the second column, and the second last column), the total column between those columns are different from row to row

非常感谢您的回答:)

最佳答案

最可能的原因是 Spark 正在尝试识别新创建的数据帧的架构。尝试将 RDD 映射到 DF 的第二种方法 - 指定模式,然后通过 createDataFrame ,例如:

>>> from pyspark.sql.types import *
>>> schema = StructType([StructField('a', StringType()),StructField('b', StringType())])
>>> df = sqlContext.createDataFrame(input_iso.map(extract_iso), schema)

关于python - 无法将 RDD 转换为 DataFrame(RDD 有数百万行),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41648495/

相关文章:

csv - Spark 选项 : inferSchema vs header = true

mysql - Spark工作人员同时更新Mysql表的同一行

scala - 模拟 SparkSession 进行单元测试

caching - apache Spark sql 中的缓存表

python - 在python中通过HTTP将日志条目写入splunk

python - 为什么 Python 的 'StandardScaler' 和 Matlab 的 'zscore' 之间的标准化不同?

r - 从 R 中的 CSV 文件创建 5 向维恩图

javascript - 如何从数据库中获取JSON数据?

python - AWS boto3 列出代码管道

python - Pyright/mypy : "expr" has no attribute "id"