java - Spark使用s3a以多线程方式写入文件

标签 java apache-spark amazon-s3

我正在使用本地 Spark 从 s3 读取和写入。为了写回 s3,我使用 java 并发 util,这样我就可以以多线程方式编写。

这是我的实现

ConvertToCsv 该方法具有spark.write 操作

 for ( String Id: listOfId) {

                Future<?> future = executor.submit( () -> {

                ConvertToCsv( dataFrame, destinationPath, Id);
            } );
            futures.add( future );

        } 

我收到此错误!

No such file or directory: s3a://kira-bucket-parquet/collection/82709dd1-8924-481c-9d93-14a9e2e0c524/5e67e9d5-2d8b-4c4b-928a-4736485af3ca/_temporary/0 at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2269) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2163) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2102) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerListStatus(S3AFileSystem.java:1903) at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$listStatus$9(S3AFileSystem.java:1882) at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) at org.apache.hadoop.fs.s3a.S3AFileSystem.listStatus(S3AFileSystem.java:1882) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1919) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1961) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:269) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:309) at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:213) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala

我遇到的解决方案是配置 s3a 提交者。

如何在本地 Spark 中配置 S3a 提交者?还有其他解决方案吗?

最佳答案

要安全地提交工作(即使是在本地),您可以使用 S3A 提交程序。

虽然它们位于 hadoop-aws JAR 中,但它们是针对 Spark 以及 MapReduce 设计和测试的。

咨询the documentation

关于java - Spark使用s3a以多线程方式写入文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59445820/

相关文章:

apache-spark - 监控 Dataproc 集群上的 Spark-Shell 或 PySpark-Shell session

java - Apache Wicket 中继器 : an overview

java - 在 Java 中,cast(即(ClassName))的实例和类型如何作用于代理对象?

scala - `sbt run` 导致添加依赖后编译报错

java - 无法在 Windows 10 上设置 Apache Spark 2.1.1

ios - AWS S3 iOS 开发工具包 : How to resume upload after connection is interrupted?

amazon-web-services - AWS S3 CLI - 无法连接到终端节点 URL

python - 使用 boto3 读取指定文件夹/目录下 Amazon S3 文件夹上的表名称列表

java - Lucene 的近实时搜索实现

java - 无法从android中的远程ubuntu tomcat服务器连接mysql数据库