我尝试使用 pyspark 从 HDFS 提取文件后尝试修改我的文件,然后我想将其保存在HDFS中,因为我已经编写了以下代码。
代码:
import subprocess
from subprocess import Popen, PIPE
from pyspark import SparkContext
cat = sc.textFile("/user/root/parsed.txt")
hrk = "@"
for line in cat.collect():
if (code == "ID"):
line =line.strip() + "|"+hrk
line.saveAsTextFile("/user/root/testsprk")
print(line)
但是,当我运行代码时,我得到了以下错误。
错误:
Traceback (most recent call last):
File "<stdin>", line 30, in <module>
AttributeError: 'unicode' object has no attribute 'saveAsTextFile'
我知道行变量存在一些问题,但我无法修复它。
最佳答案
这是因为您正在收集所有数据,这意味着收集的不是RDD,而普通的列表和行只是一个字符串。
您不应该收集驱动程序上的所有数据。相反,请先使用RDD.map
,然后再使用RDD.saveAsTextFile
def add_hrk_on_id(line):
if (code == "ID"):
return line.strip() + "|"+hrk
else
return line
cat.map(add_hrk_on_id).saveAsTextFile(path)
关于python - 'unicode'对象没有属性 'saveAsTextFile',我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43999108/