python - PySpark运行时错误: Set changed size during iteration

标签 python apache-spark pyspark

我正在运行 pyspark 脚本并遇到以下错误。由于我的代码“if len(rdd.take(1)) > 0:”,似乎在说“RuntimeError:在迭代期间设置更改的大小”。我不确定这是否是真正的原因,并想知道到底出了什么问题。任何帮助将不胜感激。

谢谢!

17/03/23 21:54:17 INFO DStreamGraph: Updated checkpoint data for time 1490320070000 ms
17/03/23 21:54:17 INFO JobScheduler: Finished job streaming job 1490320072000 ms.0 from job set of time 1490320072000 ms
17/03/23 21:54:17 INFO JobScheduler: Starting job streaming job 1490320072000 ms.1 from job set of time 1490320072000 ms
17/03/23 21:54:17 ERROR JobScheduler: Error running job streaming job 1490320072000 ms.0
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/util.py",

line 65, in call r = self.func(t, *rdds) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 159, in func = lambda t, rdd: old_func(rdd) File "/home/richard/Documents/spark_code/with_kafka/./mongo_kafka_spark_script.py", line 96, in _compute_glb_max if len(rdd.take(1)) > 0: File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1343, in take res = self.context.runJob(self, takeUpToNumLeft, p) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 965, in runJob port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2439, in _jrdd self._jrdd_deserializer, profiler) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2372, in _wrap_function pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2363, in _prepare_for_python_RDD broadcast_vars = [x._jbroadcast for x in sc._pickled_broadcast_vars] RuntimeError: Set changed size during iteration

  at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
  at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
  at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
  at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
  at scala.util.Try$.apply(Try.scala:192)
  at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Traceback (most recent call last):
  File "/home/richard/Documents/spark_code/with_kafka/./mongo_kafka_spark_script.py",

line 224, in ssc.awaitTermination(); File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/context.py", line 206, in awaitTermination File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o38.awaitTermination. : org.apache.spark.SparkException: An exception was raised by Python: Traceback (most recent call last): File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 65, in call r = self.func(t, *rdds) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 159, in func = lambda t, rdd: old_func(rdd) File "/home/richard/Documents/spark_code/with_kafka/./mongo_kafka_spark_script.py", line 96, in _compute_glb_max if len(rdd.take(1)) > 0: File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1343, in take res = self.context.runJob(self, takeUpToNumLeft, p) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 965, in runJob port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2439, in _jrdd self._jrdd_deserializer, profiler) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2372, in _wrap_function pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2363, in _prepare_for_python_RDD broadcast_vars = [x._jbroadcast for x in sc._pickled_broadcast_vars] RuntimeError: Set changed size during iteration

  at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
  at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
  at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
  at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
  at scala.util.Try$.apply(Try.scala:192)
  at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

最佳答案

这似乎不是在迭代中创建广播变量的最佳实践。当需要状态数据时,如果可能,请始终使用 updateStateByKey。

关于python - PySpark运行时错误: Set changed size during iteration,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42990249/

相关文章:

python - 计算所选元素为最大值的子数组的数量

python - 没有名为 graphframes Jupyter Notebook 的模块

postgresql - Pyspark 连接到 ipython 笔记本中的 Postgres 数据库

apache-spark - 无法启动 Spark 历史服务器

python - Odoo 打开新窗口

Python:在数组中查找一个元素,如果找不到则做某事

scala - UDF在Spark SQL中不起作用

python - 如何填充具有以天数表示的差异的两行之间的空白

python - 在 psychopy 的 while 循环中只写出第一个结果

apache-spark - 如何在 Spark 中按分区对键/值进行分组?