python - Spark 无法 pickle method_descriptor

标签 python hbase apache-spark pickle happybase

我收到这个奇怪的错误信息

15/01/26 13:05:12 INFO spark.SparkContext: Created broadcast 0 from wholeTextFiles at NativeMethodAccessorImpl.java:-2
Traceback (most recent call last):
  File "/home/user/inverted-index.py", line 78, in <module>
    print sc.wholeTextFiles(data_dir).flatMap(update).top(10)#groupByKey().map(store)
  File "/home/user/spark2/python/pyspark/rdd.py", line 1045, in top
    return self.mapPartitions(topIterator).reduce(merge)
  File "/home/user/spark2/python/pyspark/rdd.py", line 715, in reduce
    vals = self.mapPartitions(func).collect()
  File "/home/user/spark2/python/pyspark/rdd.py", line 676, in collect
    bytesInJava = self._jrdd.collect().iterator()
  File "/home/user/spark2/python/pyspark/rdd.py", line 2107, in _jrdd
    pickled_command = ser.dumps(command)
  File "/home/user/spark2/python/pyspark/serializers.py", line 402, in dumps
    return cloudpickle.dumps(obj, 2)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 816, in dumps
    cp.dump(obj)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 133, in dump
    return pickle.Pickler.dump(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 562, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 254, in save_function
    self.save_function_tuple(obj, [themodule])
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 304, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
    save(x)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 254, in save_function
    self.save_function_tuple(obj, [themodule])
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 304, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
    save(x)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 254, in save_function
    self.save_function_tuple(obj, [themodule])
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 304, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 636, in _batch_appends
    save(tmp[0])
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 249, in save_function
    self.save_function_tuple(obj, modList)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 309, in save_function_tuple
    save(f_globals)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 650, in save_reduce
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 650, in save_reduce
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 650, in save_reduce
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 547, in save_inst
    self.save_inst_logic(obj)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 537, in save_inst_logic
    save(stuff)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 547, in save_inst
    self.save_inst_logic(obj)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 537, in save_inst_logic
    save(stuff)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 616, in save_reduce
    save(cls)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 467, in save_global
    d),obj=obj)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 631, in save_reduce
    save(args)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 616, in save_reduce
    save(cls)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 442, in save_global
    raise pickle.PicklingError("Can't pickle builtin %s" % obj)
pickle.PicklingError: Can't pickle builtin <type 'method_descriptor'>

我的更新函数返回一个类型为 (key, (value1, value2)) 的元组列表所有这些都是字符串,如下所示:
def update(doc):
    doc_id  = doc[0][path_len:-ext_len] #actual file name
    content = doc[1].lower()

    new_fi = regex.split(content)
    old_fi = fi_table.row(doc_id)

    fi_table.put(doc_id, {'cf:col': ",".join(new_fi)})

    if not old_fi:
        return [(term, ('add', doc_id)) for term in new_fi]
    else:
        new_fi = set(new_fi)
        old_fi = set(old_fi['cf:col'].split(','))
        return [(term, ('add', doc_id)) for term in new_fi - old_fi] + \
               [(term, ('del', doc_id)) for term in old_fi - new_fi]

编辑:
问题在于这两个 hbase 函数,行和放置。当我评论它们时,两个代码都有效(将 old_fi 设置为空字典),但如果其中一个运行,则会产生上述错误。我使用happybase在python中操作hbase。有人可以解释我出了什么问题吗?

最佳答案

Spark 尝试序列化连接对象,以便它可以在执行器内部使用,这肯定会失败,因为反序列化的 db 连接对象无法向另一个范围(甚至计算机)授予读/写权限。可以通过尝试广播连接对象来重现该问题。对于此实例,序列化 i/o 对象时存在问题。

通过连接到 map 函数内的数据库,该问题得到了部分解决。由于 map 函数中每个 RDD 元素都会有太多连接,我不得不切换到分区处理以将 db 连接从 20k 减少到大约 8-64(基于分区数)。 Spark 开发人员应该考虑为执行程序创建一个初始化函数/脚本,以避免这些类型的死胡同问题。

所以假设我让每个节点都执行这个 init 函数,然后每个节点都会连接到数据库(一些 conn 池,或单独的 Zookeeper 节点),因为 init 函数和 map 函数将共享相同的作用域,然后问题不见了,所以您编写的代码比我找到的解决方法更快。在执行结束时,spark 将释放/卸载这些定义的变量,程序将结束。

关于python - Spark 无法 pickle method_descriptor,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28142578/

相关文章:

python - 你如何找到字典中的第一个键?

java - Spark Streaming : Using PairRDD. saveAsNewHadoopDataset函数保存数据到HBase

arrays - Spark scala 将列名称映射到值为 true 的输出 arraytype 列

python - youtube-dl:设置元数据属性并从 python 嵌入缩略图?

python - 如何通过就地过滤来修改 python 集合?

python - 每年用 Pandas 绘制箱线图

hadoop - HBase LZO表扫描导致RegionServer关闭

hadoop - Nutch 和 HBase 配置错误

python - 评估两个数据帧中行的所有组合

python - 使用spark SQL读取Parquet格式的不存在列