apache-spark - Apache Spark 写入 s3 无法从临时文件夹移动 Parquet 文件

标签 apache-spark amazon-s3 spark-dataframe parquet

我有一个 8 小时的工作(spark 2.0.0),它使用标准方法将结果写出到 Parquet :

processed_images_df.write.format("parquet").save(s3_output_path) 

它执行 10000 个任务并将结果写入 _temporary 文件夹,并在最后一步(在所有任务完成后)从 _temporary 文件夹复制 Parquet 文件,但在复制大约 2-3000 个文件后,它失败并显示以下内容(第一我以为这是暂时的 s3 故障,但我重新运行了 3 次并得到相同的错误):
org.apache.spark.SparkException: Job aborted. 
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:149) 
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115) 
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115) 
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) 
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115) 
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60) 
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58) 
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) 
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) 
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) 
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) 
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) 
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) 
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86) 
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86) 
        at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:487) 
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211) 
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194) 
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
        at java.lang.reflect.Method.invoke(Method.java:606) 
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) 
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
        at py4j.Gateway.invoke(Gateway.java:280) 
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128) 
        at py4j.commands.CallCommand.execute(CallCommand.java:79) 
        at py4j.GatewayConnection.run(GatewayConnection.java:211) 
        at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.http.NoHttpResponseException: s3-bucket.s3.amazonaws.com:443 failed to respond 
        at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:143) 
        at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57) 
        at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:261) 
        at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283) 
        at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:259) 
        at org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:232) 
        at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272) 
        at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124) 
        at org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:686) 
        at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:488) 
        at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:884) 
        at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) 
        at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55) 
        at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:326) 
        at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:277) 
        at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestPut(RestStorageService.java:1143) 
        at org.jets3t.service.impl.rest.httpclient.RestStorageService.copyObjectImpl(RestStorageService.java:2117) 
        at org.jets3t.service.StorageService.copyObject(StorageService.java:898) 
        at org.jets3t.service.StorageService.copyObject(StorageService.java:943) 
        at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:320) 
        at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source) 
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
        at java.lang.reflect.Method.invoke(Method.java:606) 
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190) 
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103) 
        at org.apache.hadoop.fs.s3native.$Proxy20.copy(Unknown Source) 
        at org.apache.hadoop.fs.s3native.NativeS3FileSystem.rename(NativeS3FileSystem.java:645) 
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:345) 
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362) 
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310) 
        at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:46) 
        at org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:222) 
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:144)
        ... 29 more

最佳答案

我为这个问题找到的解决方案是将 Hadoop 更新到 2.7 并设置

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2

在 Spark 1.6 中有一个直接写入 s3 的 fileoutputcommiter 的替代版本,但它在 spark 2.0.0 中被弃用:https://issues.apache.org/jira/browse/SPARK-10063

关于apache-spark - Apache Spark 写入 s3 无法从临时文件夹移动 Parquet 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39778587/

相关文章:

scala - Spark:增加分区数量而不引起随机播放?

scala - 将两列传递给scala中的udf?

scala - 尝试用两列 [Seq(), String] 创建数据框 - Spark

sbt - 在 Spark 流中找不到 KafkaUtils 类

amazon-s3 - 节流亚马逊 s3 以限制预算

ruby-on-rails - 从 S3 获取文件的 md5 校验和

c# - S3 分段上传 : how can I cancel one?

dataframe - Spark SQL 1.6.0 - 简单查询的大量内存使用

apache-spark - 以编程方式为 Apache Spark 中的数据框生成模式和数据

apache-spark - Spark 提交选项列表