apache-spark - PySpark:过滤 RDD 元素失败, 'NoneType' 对象不可迭代

标签 apache-spark pyspark

我想过滤 RDD 中字段“status”不等于“OK”的元素。我从 HDFS 上的一组 CSV 文件创建 RDD,然后在尝试过滤之前使用 map 获取我想要的结构:

import csv, StringIO    

files = "/hdfs_path/*.csv"

fields = ["time", "status"]

dial = "excel"

default = {'status': 'OK', 'time': '2014-01-01  00:00:00'}

def loadRecord(line, fieldnames, dialect):
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames = fieldnames, dialect = dialect)
    try:
        line = reader.next()
        if line is None:
            return default
        else:
            return line
    except:
        return default

harmonics = sc.textFile(files) \
              .map(lambda x: loadRecord(x, fields, dial)) \
              .filter(lambda x: "OK" not in x['status'])

我可以对这个 RDD 做其他事情——例如另一个 mapget 仅某些字段等。但是,当我使用 filter 运行代码时,其中一个任务总是失败并显示我的 filter lambda 函数中出现异常:

'NoneType object is not iterable'

我认为这意味着 filter lambda 正在接收 None,因此我向 loadRecord 添加了代码以避免返回 None 。但是,我仍然遇到同样的错误。它确实适用于小样本数据集,但我的实际数据足够大,我不确定如何检测它的哪一部分可能导致问题。

欢迎任何意见!

最佳答案

首先,将 map(lambda x: loadRecord(x, fields, dial))map(lambda x: (x, loadRecord(x, fields, dial))) - 这样您就可以保存原始记录和解析后的记录。

其次,用 flatMap(test_function) 替换 filter() 调用,并定义 test_function 测试输入的方式,如果第二个传递的参数为 None (已解析的记录),它会返回第一个。

这样,您将获得导致问题的输入行,并在本地测试您的脚本。一般来说,我会添加一行 global default 作为 loadRecord 函数的第一行

关于apache-spark - PySpark:过滤 RDD 元素失败, 'NoneType' 对象不可迭代,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28996575/

相关文章:

apache-spark - 将Parquet文件加载到作为Parquet失败存储的Hive表中(值是null)

python - 在同一 IDE 中使用 Spark 和 Python

apache-spark - 使用 Kerberized Dataproc 集群时,8088 上的资源管理器 UI 无法正常工作

java - 在 Spark 中使用 groupBy 聚合函数计数使用情况

apache-spark - Spark数据框添加新列问题 - 结构化流

postgresql - Spark 从 Postgres JDBC 表读取速度慢

apache-spark - Spark SQL 是否包括用于连接的表流优化?

scala - Spark SQL DataFrame——distinct() 与 dropDuplicates()

cassandra - 将 Cassandra 与 Spark (pyspark) 连接/集成

python - 如何停止 SparkContext