PySpark.RDD.first -> UnpicklingError : NEWOBJ class argument has NULL tp_new

标签 pyspark

我将 python 2.7 与 spark 1.5.1 一起使用,我得到了这个:

df = sqlContext.read.parquet(".....").cache()
df = df.filter(df.foo == 1).select("a","b","c")
def myfun (row):
    return pyspark.sql.Row(....)
rdd = df.map(myfun).cache()
rdd.first()
==> UnpicklingError: NEWOBJ class argument has NULL tp_new

怎么了?

最佳答案

像往常一样,pickling 错误归结为 myfun 被不可 picklable 对象关闭。

像往常一样,解决方案是使用mapPartitions:

import pygeoip
def get_geo (rows):
    db = pygeoip.GeoIP("/usr/share/GeoIP/GeoIPCity.dat")
    for row in rows:
        d = row.asDict()
        d["new"] = db.record_by_addr(row.client_ip) if row.client_ip else "noIP"
        yield d
rdd.mapPartitions(get_geo)

代替 map :

import pygeoip
db = pygeoip.GeoIP("/usr/share/GeoIP/GeoIPCity.dat")
def get_geo (row):
    d = row.asDict()
    d["new"] = db.record_by_addr(row.client_ip) if row.client_ip else "noIP"
    return d
rdd.map(get_geo)

关于PySpark.RDD.first -> UnpicklingError : NEWOBJ class argument has NULL tp_new,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33112441/

相关文章:

json - Spark 2.0.0 使用可变模式读取 json 数据

pyspark - 如何制作 PySpark Row 对象的变异副本?

postgresql - 在 shell 中使用 pyspark 连接到 postgresql 时出错 - 找不到 org.postgresql.Driver 类异常

json - 带有 json 和非 json 列的 pyspark 读取文件

python-2.7 - 将本地 IPython 笔记本连接到气隙集群上的 Spark

pyspark - 将字符串列的 Spark 数据框拆分为多个 bool 列

python - Spark : Warning that task size is too large despite no large, 非分布式文件

dataframe - 计算pyspark数据框列的百分位数

pyspark - 从 Spark 数据帧写入 Snowflake 命名阶段

python - 如何在 Pyspark 中将行分成多行