我想过滤
RDD 中字段“status”不等于“OK”的元素。我从 HDFS 上的一组 CSV 文件创建 RDD,然后在尝试过滤
之前使用 map
获取我想要的结构:
import csv, StringIO
files = "/hdfs_path/*.csv"
fields = ["time", "status"]
dial = "excel"
default = {'status': 'OK', 'time': '2014-01-01 00:00:00'}
def loadRecord(line, fieldnames, dialect):
input = StringIO.StringIO(line)
reader = csv.DictReader(input, fieldnames = fieldnames, dialect = dialect)
try:
line = reader.next()
if line is None:
return default
else:
return line
except:
return default
harmonics = sc.textFile(files) \
.map(lambda x: loadRecord(x, fields, dial)) \
.filter(lambda x: "OK" not in x['status'])
我可以对这个 RDD 做其他事情——例如另一个 map
到 get
仅某些字段等。但是,当我使用 filter
运行代码时,其中一个任务总是失败并显示我的 filter
lambda 函数中出现异常:
'NoneType object is not iterable'
我认为这意味着 filter
lambda 正在接收 None
,因此我向 loadRecord
添加了代码以避免返回 None
。但是,我仍然遇到同样的错误。它确实适用于小样本数据集,但我的实际数据足够大,我不确定如何检测它的哪一部分可能导致问题。
欢迎任何意见!
最佳答案
首先,将 map(lambda x: loadRecord(x, fields, dial))
与 map(lambda x: (x, loadRecord(x, fields, dial)))
- 这样您就可以保存原始记录和解析后的记录。
其次,用 flatMap(test_function)
替换 filter()
调用,并定义 test_function
测试输入的方式,如果第二个传递的参数为 None (已解析的记录),它会返回第一个。
这样,您将获得导致问题的输入行,并在本地测试您的脚本。一般来说,我会添加一行 global default
作为 loadRecord
函数的第一行
关于apache-spark - PySpark:过滤 RDD 元素失败, 'NoneType' 对象不可迭代,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28996575/