python - 在 Jupyter 笔记本中使用 PySpark 读取 XML

标签 python xml apache-spark pyspark

我正在尝试读取 XML 文件: df = spark.read.format('com.databricks.spark.xml').load('/path/to/my.xml') 并收到以下错误:

java.lang.ClassNotFoundException: Failed to find data source: com.databricks.spark.xml

我尝试过:

  • 安装 pyspark-xml

    $SPARK_HOME/bin/spark-shell --packages com.databricks:spark-xml_2.12:0.10.0
    
    
  • 使用配置运行 Spark:set jar_path = f'{SPARK_HOME}/jars/spark-xml_2.12-0.10.0.jar' spark = SparkSession.builder.config(conf=conf).config("spark.jars", jar_path).config("spark.executor.extraClassPath", jar_path).config("spark.executor.extraLibrary", jar_path).config("spark.driver.extraClassPath", jar_path).appName('my_app').getOrCreate()

  • 设置 evn 变量:os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.10.0 pyspark'

  • 下载jar文件并放入SPARK_HOME/jars

这里:https://github.com/databricks/spark-xml在“Pyspark 注释”段落中有 PySpark 的替代解决方案,但我不知道如何读取数据帧以便将其传递到函数 ext_schema_of_xml_df 中。

那么,我还应该怎么做才能在 JupyterLab 中使用 PySpark 读取 XML?

最佳答案

正如您所猜测的,关键是要加载包,以便 PySpark 能够在 Jupyter 的上下文中使用它。

使用常规导入启动您的笔记本:

import pandas as pd
from pyspark.sql import SparkSession
import os

在实例化 session 之前,请执行以下操作:

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.12.0 pyspark-shell'

注释:

  • 包版本的第一部分必须与构建 Spark 所用的 Scala 版本相匹配 - 您可以通过从命令行执行 Spark-submit --version 来找到这一点。例如
$ spark-submit --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.2
      /_/
                        
Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 1.8.0_292
Branch HEAD
Compiled by user centos on 2021-02-16T06:09:22Z
Revision 648457905c4ea7d00e3d88048c63f360045f0714
Url https://gitbox.apache.org/repos/asf/spark.git
Type --help for more information.

软件包版本的第二部分必须是为给定版本的 Scala 提供的内容 - 您可以在这里找到: https://github.com/databricks/spark-xml - 所以就我而言,由于我使用 Scala 2.12 构建了 Spark,所以我需要的包是 com.databricks:spark-xml_2.12:0.12.0

现在实例化您的 session :

# Creates a session on a local master
sparkSesh = SparkSession.builder.appName("XML_Import") \
    .master("local[*]").getOrCreate()

找到一个您知道其结构的简单 .xml 文件 - 在我的例子中,我使用了 nmap 输出的 XML 版本

thisXML = "simple.xml"

这样做的原因是您可以为下面的“rootTag”和“rowTag”提供适当的值:

someXSDF = sparkSesh.read.format('xml') \
        .option('rootTag', 'nmaprun') \
        .option('rowTag', 'host') \
        .load(thisXML)

如果文件足够小,您可以执行 .toPandas() 来查看它:

someXSDF.toPandas()[["address", "ports"]][:5]

enter image description here

然后关闭 session 。

sparkSesh.stop()

结束语:

  • 如果您想在 Jupyter 之外进行测试,只需进入命令行即可
pyspark --packages com.databricks:spark-xml_2.12:0.12.0

您应该看到它在 PySpark shell 中正确加载

  • 如果包版本与 scala 版本不匹配,您可能会收到此错误:“异常:Java gateway process exited before Sending its port number”这是一种非常有趣的方式解释一下包版本号错误
  • 如果您为用于构建 Spark 的 Scala 版本加载了错误的包,则在尝试读取 XML 时可能会收到此错误:py4j.protocol.Py4JJavaError:错误调用 o43.load 时发生。 : java.lang.NoClassDefFoundError: scala/Product$class
  • 如果读取似乎有效,但您得到一个空数据框,则您可能指定了错误的根标记和/或行标记
  • 如果您需要支持多种读取类型(假设您还需要能够读取同一笔记本中的 Avro 文件),您可以列出多个包,并用逗号(无空格)分隔它们,如下所示:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.12.0,org.apache.spark:spark-avro_2.12:3.1.2 pyspark-shell'
  • 我的版本信息:Python 3.6.9、Spark 3.0.2

关于python - 在 Jupyter 笔记本中使用 PySpark 读取 XML,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63951922/

相关文章:

scala - 使用特殊格式压缩的 Spark 阅读

apache-spark - 如何从属性文件设置 Kafka 参数?

python - pexpect 有什么替代品吗?

java - 以 XML 格式发送 blob 或字节数组

python - 如何在图像的人脸/ body 区域中检测太阳镜

html - XPath//* vs//元素 vs//

xml - 为什么需要 XML 编码?

apache-spark - 广播哈希连接 - 迭代

Python 列表理解太慢

python - 如何在Open CV Python中消除这些噪音?