python - Pyspark:无法将文件写入 Azure Blob 存储

标签 python azure apache-spark pyspark

我目前正在开发一个项目,尝试使用 PySpark 将 AVRO 文件存储在 Azure Blob 存储上,但没有任何运气。写入 csv 文件也不起作用。使用 spark-submit 并提供 --jars 参数以及 jar 文件的路径会产生相同的错误。

我正在 VsCode 中的 Jupyter Notebook 中运行代码示例。

有人可以帮我解决这个错误吗?

示例代码:

import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as f


STORAGE_ACCOUNT_NAME = "mystorageaccount"
CONTAINER_NAME = "mycontainer"
SPARK_AVRO_JAR_PATH = "/path_to_jar/spark-avro_2.12-3.2.0.jar"
AZURE_STORAGE_BLOB_JAR_PATH = "/path_to_jar/azure-storage-blob-12.14.3.jar"
AZURE_STORAGE_JAR_PATH = "/path_to_jar/azure-storage-8.6.6.jar"
HADOOP_AZURE_JAR_PATH = "/path_to_jar/hadoop-azure-3.3.1.jar"
DESTINATION_CONTAINER_STR = f"wasbs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.blob.core.windows.net/"

cpu = multiprocessing.cpu_count() -1
spark = SparkSession.builder \
    .master(f"local[{cpu}]") \
    .appName("MyApp") \
    .config('spark.jars', f"{SPARK_AVRO_JAR_PATH},{AZURE_STORAGE_BLOB_JAR_PATH},{AZURE_STORAGE_JAR_PATH},{HADOOP_AZURE_JAR_PATH}")\
    .config("spark.driver.memory", "15g")\
    .getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.sparkContext.setLogLevel("WARN")
spark.conf.set(
    f"fs.azure.account.key.{STORAGE_ACCOUNT_NAME}.blob.core.windows.net",
    f"{STORAGE_ACCOUNT_ACCESS_KEY}"
)

file_name = "test"
df = spark.createDataFrame(["10","11","13"], "string").toDF("age")
df.write.format("avro").save(f"{DESTINATION_CONTAINER_STR}{file_name}")

错误消息:

Py4JJavaError                             Traceback (most recent call last)
/var/folders/93/_09pzbms1d9cyfm7mkt6hnhm0000gn/T/ipykernel_74366/438907801.py in <module>
     81 file_name = "test"
     82 df = spark.createDataFrame(["10","11","13"], "string").toDF("age")
---> 83 df.write.format("csv").save(f"{DESTINATION_CONTAINER_STR}{file_name}")

~/jens/.venv/lib/python3.9/site-packages/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
    738             self._jwrite.save()
    739         else:
--> 740             self._jwrite.save(path)
    741 
    742     @since(1.4)

~/jens/.venv/lib/python3.9/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1307 
   1308         answer = self.gateway_client.send_command(command)
-> 1309         return_value = get_return_value(
   1310             answer, self.gateway_client, self.target_id, self.name)
   1311 

~/jens/.venv/lib/python3.9/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
    109     def deco(*a, **kw):
    110         try:
--> 111             return f(*a, **kw)
    112         except py4j.protocol.Py4JJavaError as e:
    113             converted = convert_exception(e.java_exception)

~/jens/.venv/lib/python3.9/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o152.save.
: java.lang.NoClassDefFoundError: org/eclipse/jetty/util/ajax/JSON$Convertor
    at org.apache.hadoop.fs.azure.NativeAzureFileSystem.createDefaultStore(NativeAzureFileSystem.java:1441)
    at org.apache.hadoop.fs.azure.NativeAzureFileSystem.initialize(NativeAzureFileSystem.java:1366)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469)
    at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
    at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:461)
    at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:556)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.eclipse.jetty.util.ajax.JSON$Convertor
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 25 more

版本:

(.venv) jens % pyspark --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.0
      /_/
                        
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 1.8.0_302
Branch HEAD
Compiled by user ubuntu on 2021-10-06T12:46:30Z
Revision 5d45a415f3a29898d92380380cfd82bfc7f579ea
Url https://github.com/apache/spark
Type --help for more information.
(.venv) jens % python --version
Python 3.9.7

最佳答案

对我来说,这看起来不像访问错误,而更像是代码错误。

关于python - Pyspark:无法将文件写入 Azure Blob 存储,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70823662/

相关文章:

azure - azure 云上的私有(private)和公共(public) IP 地址

javascript - 如何用VBA构造URL?

scala - 使用 HBase 的 Spark 作业失败

java - 为什么在 IntelliJ IDEA 中运行 MLlib 项目失败并显示 "AssertionError: assertion failed: unsafe symbol CompatContext"?

python - 为一个名称 urllib2 发送多个值

python - Django REST 框架 : overriding get_queryset() sometimes returns a doubled queryset

azure - 新的Azure网站,通过FTP上传内容,并得到 "nothing here yet"消息,很困惑

scala - MLlib依赖项错误

Python Pandas 更改索引数据框

python - 上采样数据和插值