代码:
views = sdf \
.where(sdf['PRODUCT_ID'].isin(PRODUCTS)) \
.rdd \
.groupBy(lambda x: x['SESSION_ID']) \
.toLocalIterator()
for sess_id, rows in views:
# do something
PRODUCTS
是 set
.它很大,大约有10000件元素。代码失败:
--> 9 for sess_id, rows in views:
/usr/local/spark/python/pyspark/rdd.py in _load_from_socket(port, serializer)
--> 142 for item in serializer.load_stream(rf):
/usr/local/spark/python/pyspark/serializers.py in load_stream(self, stream)
--> 139 yield self._read_with_length(stream)
/usr/local/spark/python/pyspark/serializers.py in _read_with_length(self, stream)
--> 156 length = read_int(stream)
/usr/local/spark/python/pyspark/serializers.py in read_int(stream)
--> 543 length = stream.read(4)
/opt/conda/lib/python3.5/socket.py in readinto(self, b)
574 try:
--> 575 return self._sock.recv_into(b)
576 except timeout:
577 self._timeout_occurred = True
timeout: timed out
但是当我做
PRODUCTS
设置小一点一切都好。我试图在 Spark 配置中更改一些超时值。它没有帮助。如何避免此类崩溃?更新
PRODUCTS = sdf.sort(['TIMESTAMP']).select('PRODUCT_ID').limit(10000).drop_duplicates()
views = sdf \
.join(PRODUCTS, 'PRODUCT_ID', 'inner') \
.rdd \
.groupBy(lambda x: x['SESSION_ID']) \
.toLocalIterator()
for sess_id, rows in views:
# do ...
现在
PRODUCTS
是一个数据框。我使用 join
.得到同样的错误..更新 2
尝试这个解决方案:
views = sdf \
.join(PRODUCTS, 'PRODUCT_ID', 'inner') \
.rdd \
.groupBy(lambda x: x['SESSION_ID'])
views.cache()
for sess_id, rows in views.toLocalIterator():
pass
一段时间后得到一个很长的错误:
Py4JJavaError: An error occurred while calling o289.javaToPython.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
....
这个错误只出现过一次!现在我得到了相同的超时异常!
最佳答案
我相信这基本上是由于 toLocalIterator()
的实现中的一个错误造成的。在 pyspark 2.0.2
.您可以在这里阅读更多信息:[SPARK-18281][SQL][PySpark] Remove timeout for reading data through socket for local iterator .
似乎修复将在 2.0.2
之后的下一次更新中可用并在 2.1.x
释放。如果您想自己临时修复它,您可以应用上述问题中的更改:
在 rdd.py
的第 138 行附近替换它(在实际的 Spark 集群上,您似乎需要更新 rdd.py
内的 pyspark.zip
:
try:
rf = sock.makefile("rb", 65536)
for item in serializer.load_stream(rf):
yield item
finally:
sock.close()
有了这个:
sock.settimeout(None) # << this is they key line that disables timeout after the initial connection
return serializer.load_stream(sock.makefile("rb", 65536))
关于python-3.x - Pyspark Column.isin() 用于大集合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39000514/