apache-spark - Spark 中的 xml 处理

标签 apache-spark

场景: 我的输入将是多个小的 XML,并且应该将这些 XML 读取为 RDD。执行与另一个数据集的连接并形成一个 RDD,并将输出作为 XML 发送。

是否可以使用 spark 读取 XML,将数据加载为 RDD?如果可能,将如何读取 XML。

示例 XML:

<root>
    <users>
        <user>
              <account>1234<\account>
              <name>name_1<\name>
              <number>34233<\number>
         <\user>
         <user>
              <account>58789<\account>
              <name>name_2<\name>
              <number>54697<\number>
         <\user>    
    <\users>
<\root>

如何将其加载到 RDD 中?

最佳答案

是的,有可能,但细节会因您采用的方法而异。

  • 如果文件很小,如您所提到的,最简单的解决方案是使用 SparkContext.wholeTextFiles 加载您的数据。它将数据加载为 RDD[(String, String)],其中第一个元素是路径,第二个元素是文件内容。然后像在本地模式中一样单独解析每个文件。
  • 对于较大的文件,您可以使用 Hadoop input formats .
    • 如果结构简单,您可以使用 textinputformat.record.delimiter 拆分记录。你可以找到一个简单的例子here .输入不是 XML,但你应该给你它以及如何继续的想法
    • 否则 Mahout 会提供 XmlInputFormat
  • 最后,可以使用 SparkContext.textFile 读取文件并稍后调整分区之间的记录跨越。从概念上讲,它的意思类似于创建滑动窗口或 partitioning records into groups of fixed size。 :

    • 使用mapPartitionsWithIndex 分区识别分区间损坏的记录,收集损坏的记录
    • 使用第二个 mapPartitionsWithIndex 修复损坏的记录

编辑:

还有比较新的spark-xml允许您通过标签提取特定记录的包:

val df = sqlContext.read
  .format("com.databricks.spark.xml")
   .option("rowTag", "foo")
   .load("bar.xml")

关于apache-spark - Spark 中的 xml 处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53659158/

相关文章:

java - 具有 spark 序列化问题的功能接口(interface)

Apache Beam 中对 SparkRunner 的 Python 支持

java - 在 Spark 流式转换中使用第三方不可序列化对象

python - 计算pyspark中两个数据帧的行之间的距离

python - 如何使用 "Trigger once"触发器控制 Spark Structured Streaming 中每个触发器处理的文件数量?

Java Apache Spark 将 TSV 格式转换为 JavaRDD

apache-spark - 使用 Apache spark java 搜索替换

docker - 如何远程运行spark-submit?

python - 正则表达式查找所有不包含_(下划线)和:(Colon) in PySpark Dataframe column的字符串

apache-spark - 如何在awaitTermination后获取流查询的进度?