我正在尝试读取 XML 文件:
df = spark.read.format('com.databricks.spark.xml').load('/path/to/my.xml')
并收到以下错误:
java.lang.ClassNotFoundException: Failed to find data source: com.databricks.spark.xml
我尝试过:
安装 pyspark-xml
$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-xml_2.12:0.10.0
使用配置运行 Spark:
set jar_path = f'{SPARK_HOME}/jars/spark-xml_2.12-0.10.0.jar' spark = SparkSession.builder.config(conf=conf).config("spark.jars", jar_path).config("spark.executor.extraClassPath", jar_path).config("spark.executor.extraLibrary", jar_path).config("spark.driver.extraClassPath", jar_path).appName('my_app').getOrCreate()
设置 evn 变量:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.10.0 pyspark'
下载jar文件并放入SPARK_HOME/jars
这里:https://github.com/databricks/spark-xml在“Pyspark 注释”段落中有 PySpark 的替代解决方案,但我不知道如何读取数据帧以便将其传递到函数 ext_schema_of_xml_df 中。
那么,我还应该怎么做才能在 JupyterLab 中使用 PySpark 读取 XML?
最佳答案
正如您所猜测的,关键是要加载包,以便 PySpark 能够在 Jupyter 的上下文中使用它。
使用常规导入启动您的笔记本:
import pandas as pd
from pyspark.sql import SparkSession
import os
在实例化 session 之前,请执行以下操作:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.12.0 pyspark-shell'
注释:
- 包版本的第一部分必须与构建 Spark 所用的 Scala 版本相匹配 - 您可以通过从命令行执行 Spark-submit --version 来找到这一点。例如
$ spark-submit --version
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.0.2
/_/
Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 1.8.0_292
Branch HEAD
Compiled by user centos on 2021-02-16T06:09:22Z
Revision 648457905c4ea7d00e3d88048c63f360045f0714
Url https://gitbox.apache.org/repos/asf/spark.git
Type --help for more information.
软件包版本的第二部分必须是为给定版本的 Scala 提供的内容 - 您可以在这里找到: https://github.com/databricks/spark-xml - 所以就我而言,由于我使用 Scala 2.12 构建了 Spark,所以我需要的包是 com.databricks:spark-xml_2.12:0.12.0
现在实例化您的 session :
# Creates a session on a local master
sparkSesh = SparkSession.builder.appName("XML_Import") \
.master("local[*]").getOrCreate()
找到一个您知道其结构的简单 .xml 文件 - 在我的例子中,我使用了 nmap 输出的 XML 版本
thisXML = "simple.xml"
这样做的原因是您可以为下面的“rootTag”和“rowTag”提供适当的值:
someXSDF = sparkSesh.read.format('xml') \
.option('rootTag', 'nmaprun') \
.option('rowTag', 'host') \
.load(thisXML)
如果文件足够小,您可以执行 .toPandas() 来查看它:
someXSDF.toPandas()[["address", "ports"]][:5]
然后关闭 session 。
sparkSesh.stop()
结束语:
- 如果您想在 Jupyter 之外进行测试,只需进入命令行即可
pyspark --packages com.databricks:spark-xml_2.12:0.12.0
您应该看到它在 PySpark shell 中正确加载
- 如果包版本与 scala 版本不匹配,您可能会收到此错误:
“异常:Java gateway process exited before Sending its port number”
这是一种非常有趣的方式解释一下包版本号错误 - 如果您为用于构建 Spark 的 Scala 版本加载了错误的包,则在尝试读取 XML 时可能会收到此错误:
py4j.protocol.Py4JJavaError:错误调用 o43.load 时发生。 : java.lang.NoClassDefFoundError: scala/Product$class
- 如果读取似乎有效,但您得到一个空数据框,则您可能指定了错误的根标记和/或行标记
- 如果您需要支持多种读取类型(假设您还需要能够读取同一笔记本中的 Avro 文件),您可以列出多个包,并用逗号(无空格)分隔它们,如下所示:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.12.0,org.apache.spark:spark-avro_2.12:3.1.2 pyspark-shell'
- 我的版本信息:Python 3.6.9、Spark 3.0.2
关于python - 在 Jupyter 笔记本中使用 PySpark 读取 XML,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63951922/