python - “RuntimeError: generator raised StopIteration” 如何解决这个Python问题?

标签 python pyspark spark-streaming

Spark 版本:2.3.0

python 版本:3.7,也尝试过 3.4。

在 Spark-submit 中运行以下代码并将参数作为文件名时:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

sc = SparkContext("local[*]", "KafkaStreamingConsumer")
ssc = StreamingContext(sc, 2)

kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181", "test-consumer-group", {"test": 1})

lines = kafkaStream.map(lambda x: x[1])
lines.pprint()

ssc.start()
ssc.awaitTermination()

抛出以下错误:

2019-06-14 14:23:11 ERROR Executor:91 - Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\nat\spark\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1354, in takeUpToNumLeft
StopIteration

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\nat\spark\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main
  File "C:\nat\spark\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process
  File "C:\nat\spark\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
RuntimeError: generator raised StopIteration

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
        at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
        at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2019-06-14 14:23:11 ERROR TaskSetManager:70 - Task 0 in stage 1.0 failed 1 times; aborting job
2019-06-14 14:23:11 INFO  TaskSchedulerImpl:54 - Removed TaskSet 1.0, whose tasks have all completed, from pool
2019-06-14 14:23:11 INFO  TaskSchedulerImpl:54 - Cancelling stage 1

Driver stacktrace:
2019-06-14 14:23:11 INFO  DAGScheduler:54 - Job 1 failed: runJob at PythonRDD.scala:141, took 1.225432 s
2019-06-14 14:23:11 INFO  JobScheduler:54 - Finished job streaming job 1560489790000 ms.0 from job set of time 1560489790000 ms
2019-06-14 14:23:11 ERROR JobScheduler:91 - Error running job streaming job 1560489790000 ms.0
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "C:\nat\spark\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\streaming\util.py", line 65, in call
    r = self.func(t, *rdds)
  File "C:\nat\spark\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\streaming\dstream.py", line 171, in takeAndPrint
    taken = rdd.take(num + 1)
  File "C:\nat\spark\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1358, in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
  File "C:\nat\spark\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\context.py", line 1001, in runJob
    port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
  File "C:\nat\spark\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\nat\spark\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py", line 320, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\nat\spark\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1354, in takeUpToNumLeft
StopIteration

2019-06-14 14:23:11 INFO  StreamingContext:54 - Invoking stop(stopGracefully=false) from shutdown hook
2019-06-14 14:23:11 INFO  ReceiverTracker:54 - Sent stop signal to all 1 receivers
2019-06-14 14:23:11 INFO  ReceiverSupervisorImpl:54 - Received stop signal
2019-06-14 14:23:11 INFO  ReceiverSupervisorImpl:54 - Stopping receiver with message: Stopped by driver:

当我提交 Spark 作业时,它在流中运行良好。当我在生产者控制台中给出一些输入时,错误(stopiteration)被调用。

我认为这与Python有关。 当我尝试使用 python3.7 和 3.4 时,会引发相同的错误。

请帮助我。谢谢。

最佳答案

我在使用 ‍‍‍‍‍‍‍pyspark 消费一些 Kafka 主题时遇到了同样的错误。我在这个有用的答案中找到了一些线索: https://stackoverflow.com/a/51701040/7781704其中有一个修复 StopIteration 异常的解决方案。

在我的例子中,抛出错误是因为Python 3.7 与 Spark 2.3.0 不兼容!
Spark升级到2.4.4版本后,运行正常。

关于python - “RuntimeError: generator raised StopIteration” 如何解决这个Python问题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56591963/

相关文章:

python - 如何在 django orm 中使用过滤器值作为变量

python - 检测使用cv2.HoughCircles检测到的圆圈颜色的最佳方法是什么?

python - GroupBy 语句不像字符串那样分组

random - Pyspark - 在特定列上运行的 Lambda 表达式

java - Java中带有Spark文件流的检查点

python - 如何在 Python 中向量化数组操作

python - 如何提高jdbc的spark.write性能?

java - Spark Streaming/Kafka 偏移量处理

apache-kafka - KafkaRDD scala 最小示例

apache-spark - PySpark-获取组中每一行的行号