我是 spark 和 pyspark 的新手。
我正在将一个小的 csv 文件(约 40k)读入数据框。
from pyspark.sql import functions as F
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/tmp/sm.csv')
df = df.withColumn('verified', F.when(df['verified'] == 'Y', 1).otherwise(0))
df2 = df.map(lambda x: Row(label=float(x[0]), features=Vectors.dense(x[1:]))).toDF()
我遇到了一些奇怪的错误,不是每次都发生,但确实经常发生
>>> df2.show(1)
+--------------------+---------+
| features| label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row
>>> df2.count()
41999
>>> df2.show(1)
+--------------------+---------+
| features| label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row
>>> df2.count()
41999
>>> df2.show(1)
Traceback (most recent call last):
File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 157, in manager
File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 61, in worker
File "spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 136, in main
if read_int(infile) == SpecialLengths.END_OF_STREAM:
File "spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", line 545, in read_int
raise EOFError
EOFError
+--------------------+---------+
| features| label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row
一旦引发了 EOFError,我将不会再看到它,直到我做一些需要与 spark 服务器交互的事情
当我调用 df2.count() 时,它会显示 [Stage xxx] 提示符,这就是我所说的进入 spark 服务器的意思。当我用 df2 做某事时,任何触发似乎最终都会再次给出 EOFError 的东西。
df (vs. df2) 似乎没有发生这种情况,所以似乎它一定是 df.map() 行发生的事情。
最佳答案
你可以在将数据框转换为rdd之后尝试做 map 吗?您正在对数据框应用 map 功能,然后再次从中创建数据框。语法类似于
df.rdd.map().toDF()
请让我知道它是否有效。谢谢。
关于python - 调用 map 后的pyspark EOFError,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36592665/