python - Pyspark 中的 Pickle 错误

标签 python hadoop pyspark pickle

我正在尝试在 pyspark 中解析 xml。我有一个包含许多小 xml 文件的目录,我想解析所有 xml 并将其放入 hdfs 中,为此我在下面编写了代码。

代码:

import xml.etree.ElementTree as ET
from subprocess import Popen, PIPE
import pickle
filenme = sc.wholeTextFiles("/user/root/CD")
dumpoff1 = Popen(["hadoop", "fs", "-put", "-", "/user/cloudera/Demo/Demo.txt"],stdin=PIPE)

def getname(filenm):
   return filenm[1]

def add_hk(filenm):
   source=[]
   global dumpoff1 
   doc = ET.fromstring(filenm)
   for elem1 in doc.findall('.//documentInfo/source'):
       source.append(elem1.text)
       print source[0]
       dumpoff1.stdin.write("%s\n" % source[0]) 

filenme.map(getname).foreach(add_hk)

但是当我运行这个时,我遇到了以下错误。

错误:

File "/opt/cloudera/parcels/CDH-5.11.0-1.cdh5.11.0.p0.34/lib/spark/python/pyspark/cloudpickle.py", line 582, in save_file raise pickle.PicklingError("Cannot pickle files that are not opened for reading") pickle.PicklingError: Cannot pickle files that are not opened for reading

我尝试在 add_hk 中写入 Popen 然后我没有收到 pickle 错误但是 Demo.txt 被覆盖并且只有最新的文件值。请帮忙。

最佳答案

您应该使用 spark SQL 加载您的 xml 文件,然后将它们写入 hdfs:

假设 /user/root/CD/ 是本地路径(否则删除 file://):

df = spark.read.format('com.databricks.spark.xml').options(rowTag='page').load('file:///user/root/CD/*')

你可以把它写成parquet:

df.write.parquet([HDFS path])

关于python - Pyspark 中的 Pickle 错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46036542/

相关文章:

python - 伊辛模型大都会算法 : lattice won't equilibrate

scala - 如何在不因org.apache.spark.sql.AnalysisException而失败的情况下插入覆盖Hive表:只能将数据写入到具有单个路径的关系中?

hadoop - 如何从 hadoop 集群中删除已删除的数据节点详细信息

null - pyspark:用于确定 NaN 或 Null 的用户定义函数不起作用

python - PySpark 无法访问使用 StringIndexer 添加的列

python - 如何解决Jupyter Notebook中的 "out of memory"错误?

python - 如何在 SQLAlchemy 中比较 Python 列表和 Postgres 数组?

Python - 如果字符串列表不在字典键中,则从字典列表中删除字典

hadoop - 如何从命令提示符检查 ZooKeeper 是否正在运行或启动?

apache-spark - Spark : How to utilize all cores and memory on spark stand alone cluster,,其中节点内存大小不同