我通过 pip install pyspark
安装了 spark
我正在使用以下代码从 s3 上的文件创建数据框。
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config('spark.driver.extraClassPath', '/home/ubuntu/spark/jars/aws-java-sdk-1.11.335.jar:/home/ubuntu/spark/jars/hadoop-aws-2.8.4.jar') \
.appName("cluster").getOrCreate()
df = spark.read.load('s3a://bucket/path/to/file')
但是我得到了一个错误:
--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) in () ----> 1 df = spark.read.load('s3a://bucket/path/to/file')
~/miniconda3/envs/audience/lib/python3.6/site-packages/pyspark/sql/readwriter.py in load(self, path, format, schema, **options) 164 self.options(**options) 165 if isinstance(path, basestring): --> 166 return self._df(self._jreader.load(path)) 167 elif path is not None: 168 if type(path) != list:
~/miniconda3/envs/audience/lib/python3.6/site-packages/py4j/java_gateway.py in call(self, *args) 1158 answer = self.gateway_client.send_command(command) 1159 return_value = get_return_value( -> 1160 answer, self.gateway_client, self.target_id, self.name) 1161 1162 for temp_arg in temp_args:
~/miniconda3/envs/audience/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString()
~/miniconda3/envs/audience/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 318 raise Py4JJavaError( 319 "An error occurred while calling {0}{1}{2}.\n". --> 320 format(target_id, ".", name), value) 321 else: 322 raise Py4JError(
Py4JJavaError: An error occurred while calling o220.load. : java.lang.NoClassDefFoundError: org/apache/hadoop/fs/StorageStatistics at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2134) at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2099) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295) at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:44) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:354) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.StorageStatistics at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 28 more
如果我将 s3a
更改为 s3
或 s3n
,它将要求提供 aws 访问 key 。但是,我已经在 IAM 中提供了 ec2 实例 AmazonS3FullAccess
。
IllegalArgumentException: 'AWS Access Key ID and Secret Access Key must be specified by setting the fs.s3.awsAccessKeyId and fs.s3.awsSecretAccessKey properties (respectively).'
如有任何帮助,我们将不胜感激。
最佳答案
您需要一种方法来向脚本公开您的 AWS 凭证。
下面使用 botocore 的示例可能有点过分,但可以让您免于推出自己的 AWS 配置或凭证解析器。
首先,
pip install botocore
然后创建一个 session 并盲目解析您的凭据。凭证解析的顺序是 documented here
from pyspark.sql import SparkSession
import botocore.session
session = botocore.session.get_session()
credentials = session.get_credentials()
spark = (
SparkSession
.builder
.config(
'spark.driver.extraClassPath',
'/home/ubuntu/spark/jars/aws-java-sdk-1.11.335.jar:'
'/home/ubuntu/spark/jars/hadoop-aws-2.8.4.jar')
.config('fs.s3a.access.key', credentials.access_key)
.config('fs.s3a.secret.key', credentials.secret_key)
.appName("cluster")
.getOrCreate()
)
df = spark.read.load('s3a://bucket/path/to/file')
编辑
当使用 s3n 文件系统客户端时,authentication properties是这样的
.config('fs.s3n.awsAccessKeyId', credentials.access_key)
.config('fs.s3n.awsSecretAccessKey', credentials.secret_key)
关于python - 我应该如何使用 Spark 在 s3 上加载文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50499894/