apache-spark - 如何从 pyspark 中的文件中匹配/提取多行模式

标签 apache-spark pyspark pyspark-sql

我有一个巨大的 rdf 三元组(主题谓词对象)文件,如下图所示。它提取粗体项目并具有以下输出的目标

  Item_Id | quantityAmount | quantityUnit | rank
    -----------------------------------------------
      Q31      24954         Meter       BestRank
      Q25       582         Kilometer    NormalRank  

enter image description here

我想提取遵循以下模式的行

  • 主题被赋予一个指针(<Q31> <prop/P1082> <Pointer_Q31-87RF> .)

  • 指针有一个排名(<Pointer_Q31-87RF> <rank> <BestRank>)
    和值指针(<Pointer_Q31-87RF> <prop/Pointer_value/P1082> <value/cebcf9>)

  • valuePointer 依次指向它的 Amount ( <value/cebcf9> <quantityAmount> "24954" ) 和 Unit ( <value/cebcf9> <quantityUnit> <Meter> )

通常的方法是逐行读取文件并提取上述模式中的每一个(使用 sc.textFile('inFile').flatMap(lambda x: extractFunc(x)) 然后通过不同的连接将它们组合起来这样它将提供上表。 有没有更好的方法来解决这个问题?我包括下面的文件示例。

<Q31> <prop/P1082> <Pointer_Q31-87RF> .
<Pointer_Q31-87RF> <rank> <BestRank> .
<Pointer_Q31-87RF> <prop/Pointer_P1082> "+24954"^^<2001/XMLSchema#decimal> .
<Pointer_Q31-87RF> <prop/Pointer_value/P1082> <value/cebcf9> .
<value/cebcf9> <syntax-ns#type> <QuantityValue> .
<value/cebcf9> <quantityAmount> 24954
<value/cebcf9> <quantityUnit> <Meter> .
<Q25> <prop/P1082> <Pointer_Q25-8E6C> .
<Pointer_Q25-8E6C> <rank> <NormalRank> .
<Pointer_Q25-8E6C> <prop/Pointer_P1082> "+24954”
<Pointer_Q25-8E6C> <prop/Pointer_value/P1082> <value/cebcf9> .
<value/cebcf9> <syntax-ns#type> <QuantityValue> .
<value/cebcf9> <quantityAmount> "582" .
<value/cebcf9> <quantityUnit> <Kilometer> .

最佳答案

如果可以使用\n<Q作为创建RDD元素的分隔符,那么解析数据 block 就变成了一个纯python任务。下面我创建了一个函数(基于您的示例)来使用正则表达式解析 block 文本并将 cols 信息检索到 Row 对象中(您可能必须调整正则表达式以反射(reflect)实际数据模式,即区分大小写、额外的空格等) :

  • 对于每个 RDD 元素,按'\n'(行模式)分割
  • 然后对于每一行,按 > < 分割进入列表y
  • 我们可以找到rank , quantityUnit通过检查 y[1]y[2]quantityAmount通过检查 y[1]Item_id通过检查 y[0]
  • 通过迭代所有必填字段创建 Row 对象,将缺失字段的值设置为 None

    from pyspark.sql import Row
    import re
    
    # skipped the code to initialize SparkSession
    
    # field names to retrieve
    cols = ['Item_Id', 'quantityAmount', 'quantityUnit', 'rank']
    
    def parse_rdd_element(x, cols):
        try:
            row = {}
            for e in x.split('\n'):
                y = e.split('> <')
                if len(y) < 2:
                    continue
                if y[1] in ['rank', 'quantityUnit']:
                    row[y[1]] = y[2].split(">")[0]
                else:
                    m = re.match(r'^quantityAmount>\D*(\d+)', y[1])
                    if m:
                        row['quantityAmount'] = m.group(1)
                        continue
                    m = re.match('^(?:<Q)?(\d+)', y[0])
                    if m:
                        row['Item_Id'] = 'Q' + m.group(1)
            # if row is not EMPTY, set None to missing field
            return Row(**dict([ (k, row[k]) if k in row else (k, None) for k in cols])) if row else None
        except:
            return None
    

使用 newAPIHadoopFile() 和 \n<Q 设置 RDD作为分隔符:

rdd = spark.sparkContext.newAPIHadoopFile(
    '/path/to/file',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text',
    conf={'textinputformat.record.delimiter': '\n<Q'}
)

使用map函数将RDD元素解析为Row对象

rdd.map(lambda x: parse_rdd_element(x[1], cols)).collect()
#[Row(Item_Id=u'Q31', quantityAmount=u'24954', quantityUnit=u'Meter', rank=u'BestRank'),
# Row(Item_Id=u'Q25', quantityAmount=u'582', quantityUnit=u'Kilometer', rank=u'NormalRank')]

将上面的RDD转成dataframe

df = rdd.map(lambda x: parse_rdd_element(x[1], cols)).filter(bool).toDF()
df.show()
+-------+--------------+------------+----------+
|Item_Id|quantityAmount|quantityUnit|      rank|
+-------+--------------+------------+----------+
|    Q31|         24954|       Meter|  BestRank|
|    Q25|           582|   Kilometer|NormalRank|
+-------+--------------+------------+----------+

一些注意事项:

  • 为了获得更好的性能,请使用 re.compile() 预编译所有正则表达式模式 在将它们传递给 parse_rdd_element() 函数之前。

  • 如果 \n 之间可能有空格/制表符和 <Q ,多个 block 将被添加到同一个RDD元素中,只需将RDD元素拆分为\n\s+<Q并替换 map()flatMap() .

引用:creating spark data structure from multiline record

关于apache-spark - 如何从 pyspark 中的文件中匹配/提取多行模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57499828/

相关文章:

hadoop - yarn Spark 作业调度较慢

apache-spark - 错误 : User did not initialize spark context

scala - 找不到记录器的附加程序 (org.apache.kafka.clients.producer.ProducerConfig)

apache-spark - 使用 Spark 读取 SAS sas7bdat 数据

apache-spark - 派斯帕克 : Dynamically prepare pyspark-sql query using parameters

python - 连续行之间的日期差异 - Pyspark Dataframe

Scala-Spark : Get the column names of the columns that contains null values

apache-spark - 在 Pyspark 中减去两个数组以获得一个新数组

python - 将 parquet 读取到 pandas FileNotFoundError

apache-spark - 如何配置Apache Spark 2.4.5连接到HIVE的MySQL元存储库?