python-3.x - Pyspark Column.isin() 用于大集合

标签 python-3.x apache-spark ipython pyspark

代码:

views = sdf \
    .where(sdf['PRODUCT_ID'].isin(PRODUCTS)) \
    .rdd \
    .groupBy(lambda x: x['SESSION_ID']) \
    .toLocalIterator()

for sess_id, rows in views:
    # do something
PRODUCTSset .它很大,大约有10000件元素。

代码失败:
--> 9 for sess_id, rows in views:

/usr/local/spark/python/pyspark/rdd.py in _load_from_socket(port, serializer)
--> 142         for item in serializer.load_stream(rf):

/usr/local/spark/python/pyspark/serializers.py in load_stream(self, stream)
--> 139                 yield self._read_with_length(stream)

/usr/local/spark/python/pyspark/serializers.py in _read_with_length(self, stream)
--> 156         length = read_int(stream)

/usr/local/spark/python/pyspark/serializers.py in read_int(stream)
--> 543     length = stream.read(4)

/opt/conda/lib/python3.5/socket.py in readinto(self, b)
    574             try:
--> 575                 return self._sock.recv_into(b)
    576             except timeout:
    577                 self._timeout_occurred = True

timeout: timed out

但是当我做 PRODUCTS设置小一点一切都好。我试图在 Spark 配置中更改一些超时值。它没有帮助。如何避免此类崩溃?

更新
PRODUCTS = sdf.sort(['TIMESTAMP']).select('PRODUCT_ID').limit(10000).drop_duplicates()

views = sdf \
    .join(PRODUCTS, 'PRODUCT_ID', 'inner') \
    .rdd \
    .groupBy(lambda x: x['SESSION_ID']) \
    .toLocalIterator()

for sess_id, rows in views:
    # do ...

现在 PRODUCTS是一个数据框。我使用 join .得到同样的错误..

更新 2

尝试这个解决方案:
views = sdf \
    .join(PRODUCTS, 'PRODUCT_ID', 'inner') \
    .rdd \
    .groupBy(lambda x: x['SESSION_ID'])
views.cache()

for sess_id, rows in views.toLocalIterator():
    pass

一段时间后得到一个很长的错误:
Py4JJavaError: An error occurred while calling o289.javaToPython.
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
....

这个错误只出现过一次!现在我得到了相同的超时异常!

最佳答案

我相信这基本上是由于 toLocalIterator() 的实现中的一个错误造成的。在 pyspark 2.0.2 .您可以在这里阅读更多信息:[SPARK-18281][SQL][PySpark] Remove timeout for reading data through socket for local iterator .

似乎修复将在 2.0.2 之后的下一次更新中可用并在 2.1.x释放。如果您想自己临时修复它,您可以应用上述问题中的更改:

rdd.py 的第 138 行附近替换它(在实际的 Spark 集群上,您似乎需要更新 rdd.py 内的 pyspark.zip :

try:
    rf = sock.makefile("rb", 65536)
    for item in serializer.load_stream(rf):
        yield item
finally:
    sock.close()

有了这个:
sock.settimeout(None)  # << this is they key line that disables timeout after the initial connection
return serializer.load_stream(sock.makefile("rb", 65536))

关于python-3.x - Pyspark Column.isin() 用于大集合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39000514/

相关文章:

python - Pandas 数据帧 : Split a single column into multiple columns

javascript - Transcrypt 在导入 json 时出现无法导入编解码器错误

hadoop - MLlib ALS无法删除检查点RDD错误的FS:预期的hdfs://[url]:file:///

apache-spark - 从 Row 创建 DataFrame 结果为 'infer schema issue'

python-2.7 - 如何在 pyspark 中创建具有两个输入的 UDF

python - 从文本文件创建迷宫数组

Python(17874,0x111e92dc0)malloc : can't allocate region

python - 需要帮助确定为什么我的 Theano 脚本无法运行

json - 不可读的笔记本 NotJSONError ('Notebook does not appear to be JSON: u\' {\\n "cells": [\\n {\\n "cell_type": "...' , )

python - 如何加载现有的 ipython 笔记本?