python - 派斯帕克。 spark.SparkException : Job aborted due to stage failure: Task 0 in stage 15. 0 失败 1 次,java.net.SocketException:连接重置

标签 python apache-spark pyspark apache-spark-sql

我是 pyspark 的新手,我正在尝试使用 pyspark 在 prophet 中运行多个时间序列(作为分布式计算,因为我有 100 多个时间序列要预测)但我有如下错误。

import time 
start_time = time.time()
sdf = spark.createDataFrame(data)
print('%0.2f min: Lags' % ((time.time() - start_time) / 60))
sdf.createOrReplaceTempView('Quantity')
spark.sql("select Reseller_City, Business_Unit, count(*) from Quantity group by Reseller_City, Business_Unit order by Reseller_City, Business_Unit").show()
query = 'SELECT Reseller_City, Business_Unit, conditions, black_week, promos, Sales_Date as ds, sum(Rslr_Sales_Quantity) as y FROM Quantity GROUP BY Reseller_City, Business_Unit, conditions, black_week, promos, ds ORDER BY Reseller_City, Business_Unit, ds'
spark.sql(query).show()
sdf.rdd.getNumPartitions()
store_part = (spark.sql(query).repartition(spark.sparkContext.defaultParallelism['Reseller_City','Business_Unit'])).cache()

store_part.explain()

from pyspark.sql.types import *

result_schema =StructType([
  StructField('ds',TimestampType()),
  StructField('Reseller_City',StringType()),
  StructField('Business_Unit',StringType()),
  StructField('y',DoubleType()),
  StructField('yhat',DoubleType()),
  StructField('yhat_upper',DoubleType()),
  StructField('yhat_lower',DoubleType())
  ])
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf( result_schema, PandasUDFType.GROUPED_MAP )
def forecast_sales( store_pd ):
    
    model = Prophet(interval_width=0.95, holidays = lock_down)
    model.add_country_holidays(country_name='DE')
    model.add_regressor('conditions')
    model.add_regressor('black_week')
    model.add_regressor('promos')
    
    train = store_pd[store_pd['ds']<'2021-10-01 00:00:00']
    future_pd = store_pd[store_pd['ds']>='2021-10-01 00:00:00']
    model.fit(train[['ds', 'y', 'conditions', 'black_week', 'promos']])


    forecast_pd = model.predict(future_pd[['ds', 'conditions', 'black_week', 'promos']])  

    f_pd = forecast_pd[ ['ds','yhat', 'yhat_upper', 'yhat_lower'] ].set_index('ds')

    #store_pd = store_pd.filter(store_pd['ds']<'2021-10-01 00:00:00')

    st_pd = future_pd[['ds','Reseller_City','Business_Unit','y']].set_index('ds')

    results_pd = f_pd.join( st_pd, how='left' )
    results_pd.reset_index(level=0, inplace=True)

    results_pd[['Reseller_City','Business_Unit']] = future_pd[['Reseller_City','Business_Unit']].iloc[0]

    return results_pd[ ['ds', 'Reseller_City','Business_Unit','y', 'yhat', 'yhat_upper', 'yhat_lower'] ]
results = (store_part.groupBy(['Reseller_City','Business_Unit']).apply(forecast_sales).withColumn('training date', current_date() ))
results.cache()
results.show()

所有行都完美执行,但错误来自 results.show() 行我不明白我哪里做错了,如果有人帮助我,我将不胜感激

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-46-8c647e8bf4d9> in <module>
----> 1 results.show()

C:\spark-3.0.3-bin-hadoop2.7\python\pyspark\sql\dataframe.py in show(self, n, truncate, vertical)
    438         """
    439         if isinstance(truncate, bool) and truncate:
--> 440             print(self._jdf.showString(n, 20, vertical))
    441         else:
    442             print(self._jdf.showString(n, int(truncate), vertical))

C:\spark-3.0.3-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

C:\spark-3.0.3-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
    126     def deco(*a, **kw):
    127         try:
--> 128             return f(*a, **kw)
    129         except py4j.protocol.Py4JJavaError as e:
    130             converted = convert_exception(e.java_exception)

C:\spark-3.0.3-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o128.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 1243, Grogu.profiflitzer.local, executor driver): java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(Unknown Source)
    at java.net.SocketInputStream.read(Unknown Source)
    at java.io.BufferedInputStream.fill(Unknown Source)
    at java.io.BufferedInputStream.read(Unknown Source)
    at java.io.DataInputStream.readInt(Unknown Source)
    at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:86)
    at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
    at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anon$1.hasNext(InMemoryRelation.scala:132)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1371)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1298)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1362)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1186)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:311)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
    at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(Unknown Source)
    at java.net.SocketInputStream.read(Unknown Source)
    at java.io.BufferedInputStream.fill(Unknown Source)
    at java.io.BufferedInputStream.read(Unknown Source)
    at java.io.DataInputStream.readInt(Unknown Source)
    at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:86)
    at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
    at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anon$1.hasNext(InMemoryRelation.scala:132)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1371)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1298)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1362)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1186)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:311)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    ... 1 more

最佳答案

确保你已经设置了 spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

在这里阅读更多:https://spark.apache.org/docs/3.0.1/sql-pyspark-pandas-with-arrow.html

如果这不起作用,请尝试增加驱动程序和工作程序的内存大小。

关于python - 派斯帕克。 spark.SparkException : Job aborted due to stage failure: Task 0 in stage 15. 0 失败 1 次,java.net.SocketException:连接重置,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69973790/

相关文章:

python - 如何将 Python 字典转换为 html 表?

apache-spark - Spark : filter out all rows based on key/value

apache-spark - 通过 Hadoop 输入格式示例用于 pyspark 的 BigQuery 连接器

apache-spark - 分布式规则引擎

python - 提交 PySpark 应用以在集群模式下在 YARN 上产生 Spark

python - 根据字符串列表过滤 pyspark 数据帧

pyspark - Chain withColumn 用于在 PySpark 上多次更改一列

python - 我怎样才能吞掉Python异常消息?

python - 将任意代码注入(inject) Python SimpleXMLRPC 服务器

python - mac os中的pygame安装问题