python - 我应该如何使用 Spark 在 s3 上加载文件?

标签 python apache-spark amazon-s3 pyspark

我通过 pip install pyspark 安装了 spark

我正在使用以下代码从 s3 上的文件创建数据框。

from pyspark.sql import SparkSession

spark = SparkSession.builder \
            .config('spark.driver.extraClassPath', '/home/ubuntu/spark/jars/aws-java-sdk-1.11.335.jar:/home/ubuntu/spark/jars/hadoop-aws-2.8.4.jar') \
            .appName("cluster").getOrCreate()
df = spark.read.load('s3a://bucket/path/to/file')

但是我得到了一个错误:

--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) in () ----> 1 df = spark.read.load('s3a://bucket/path/to/file')

~/miniconda3/envs/audience/lib/python3.6/site-packages/pyspark/sql/readwriter.py in load(self, path, format, schema, **options) 164 self.options(**options) 165 if isinstance(path, basestring): --> 166 return self._df(self._jreader.load(path)) 167 elif path is not None: 168 if type(path) != list:

~/miniconda3/envs/audience/lib/python3.6/site-packages/py4j/java_gateway.py in call(self, *args) 1158 answer = self.gateway_client.send_command(command) 1159 return_value = get_return_value( -> 1160 answer, self.gateway_client, self.target_id, self.name) 1161 1162 for temp_arg in temp_args:

~/miniconda3/envs/audience/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString()

~/miniconda3/envs/audience/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 318 raise Py4JJavaError( 319 "An error occurred while calling {0}{1}{2}.\n". --> 320 format(target_id, ".", name), value) 321 else: 322 raise Py4JError(

Py4JJavaError: An error occurred while calling o220.load. : java.lang.NoClassDefFoundError: org/apache/hadoop/fs/StorageStatistics at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2134) at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2099) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295) at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:44) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:354) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.StorageStatistics at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 28 more

如果我将 s3a 更改为 s3s3n,它将要求提供 aws 访问 key 。但是,我已经在 IAM 中提供了 ec2 实例 AmazonS3FullAccess

IllegalArgumentException: 'AWS Access Key ID and Secret Access Key must be specified by setting the fs.s3.awsAccessKeyId and fs.s3.awsSecretAccessKey properties (respectively).'

如有任何帮助,我们将不胜感激。

最佳答案

您需要一种方法来向脚本公开您的 AWS 凭证。

下面使用 botocore 的示例可能有点过分,但可以让您免于推出自己的 AWS 配置或凭证解析器。

首先,

pip install botocore

然后创建一个 session 并盲目解析您的凭据。凭证解析的顺序是 documented here

from pyspark.sql import SparkSession
import botocore.session

session = botocore.session.get_session()
credentials = session.get_credentials()

spark = (
    SparkSession
    .builder
    .config(
        'spark.driver.extraClassPath', 
        '/home/ubuntu/spark/jars/aws-java-sdk-1.11.335.jar:'
        '/home/ubuntu/spark/jars/hadoop-aws-2.8.4.jar')
    .config('fs.s3a.access.key', credentials.access_key)
    .config('fs.s3a.secret.key', credentials.secret_key)
    .appName("cluster")
    .getOrCreate()
)

df = spark.read.load('s3a://bucket/path/to/file')

编辑

当使用 s3n 文件系统客户端时,authentication properties是这样的

.config('fs.s3n.awsAccessKeyId', credentials.access_key)
.config('fs.s3n.awsSecretAccessKey', credentials.secret_key)

关于python - 我应该如何使用 Spark 在 s3 上加载文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50499894/

相关文章:

python - 在模板中使用 django session

python - 通过计算机视觉进行稳健的手部检测

Scala函数错误: type mismatch

java - 无效或损坏的 sbt-launch-jar 文件

node.js - Amazon S3 - 无法加载资源 : the server responded with a status of 403 (Forbidden)

java - AWS Java SDK - 将 ProgressListener 与 TransferManager 结合使用

python - 如何修复 CalledProcessError : in ffmpeg

python - 使用 "while"从另一个列表中删除一个列表的元素

scala - 如何在 Apache Spark 中使用 DStream 进行特征提取

node.js - 使用 aws-sdk 和 Node/Express 将文件简单上传到 S3