azure - Pyspark - 基于数据帧创建包含所有组合的 json 结构

标签 azure pyspark databricks azure-databricks

我有一个包含 3 列的 pyspark 数据框:

  • databricks路径
  • 国家分区
  • 年份分区

我正在通过数据工厂根据来自小部件的值创建此数据框:/image/8zIuO.png

pyspark 数据框:/image/ZcjZO.png

使用此数据框,我想创建一个包含所有组合的输出,例如,使用此命令将 json 结构作为输出发送到 ADF ( dbutils.notebook.exit({'message': 'Success', 'databricksPath': databricksPath,'yearPartition':yearPartition,'countryPartition':countryPartition})) 并能够在 foreach 事件中使用它

输出示例:

"output": {
                "value": [
                    {
                        "country": "PT",
                        "year": "2022",
                        "databricksPath": "/notebooks/1.Project/Notebook_1"
                    },
                    {
                        "country": "ES",
                        "year": "2022",
                        "databricksPath": "/notebooks/1.Project/Notebook_1"
                    },
                    {
                        "country": "IT",
                        "year": "2022",
                        "databricksPath": "/notebooks/1.Project/Notebook_1"
                    },
                    {
                        "country": "BE",
                        "year": "2022",
                        "databricksPath": "/notebooks/1.Project/Notebook_1"
                    },
                    {
                        "country": "PT",
                        "year": "2022",
                        "databricksPath": "/notebooks/1.Project/Notebook_2"
                    },
                    {
                        "country": "ES",
                        "year": "2022",
                        "databricksPath": "/notebooks/1.Project/Notebook_2"
                    },
                    {
                        "country": "IT",
                        "year": "2022",
                        "databricksPath": "/notebooks/1.Project/Notebook_2"
                    },
                    {
                        "country": "BE",
                        "year": "2022",
                        "databricksPath": "/notebooks/1.Project/Notebook_2"
                    }
                    ]
                    } 

我正在使用的笔记本:

# Databricks notebook source
from pyspark.sql import functions as F 
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from datetime import datetime, timedelta
from pyspark.sql.functions import col, lit, row_number, instr, expr, when, current_date, months_between, coalesce, concat_ws, sum as Sum, first, round, monotonically_increasing_id, date_format, concat, substring, count
from pyspark.sql.window import Window
from pathlib import Path
from functools import reduce
from pyspark.sql import DataFrame
import traceback
import pyodbc
import uuid
import sys


# COMMAND ----------

dbutils.widgets.text("databricksPath", "['/notebooks/1.Project/Notebook_1','/notebooks/1.Project/Notebook_2']", "databricksPath")
dbutils.widgets.text("countryPartition", "['PT','ES','IT','BE']", "countryPartition")
dbutils.widgets.text("yearPartition", "['2022']", "yearPartition")


databricksPath = dbutils.widgets.get('databricksPath')
countryPartition = dbutils.widgets.get('countryPartition')
yearPartition = dbutils.widgets.get('yearPartition')

# COMMAND ----------

from pyspark.sql.types import StructType,StructField, StringType
schema = StructType([
  StructField('databricksPath', StringType(), True),
  StructField('countryPartition', StringType(), True),
  StructField('yearPartition', StringType(), True)
  ])

data2 = [(databricksPath,countryPartition,yearPartition)]
df = spark.createDataFrame(data=data2,schema=schema)

df2 = df.withColumn("databricksPath", concat_ws(",",col("databricksPath")))

display(df2)

# COMMAND ----------

dbutils.notebook.exit({'message': 'Success', 'databricksPath': databricksPath,'yearPartition': yearPartition,'countryPartition': countryPartition})

任何人都可以帮助我实现这一目标

谢谢!

最佳答案

您可以使用以下代码来实现此目的:

dbutils.widgets.text("databricksPath", "['/notebooks/1.Project/Notebook_1','/notebooks/1.Project/Notebook_2']", "databricksPath")
dbutils.widgets.text("countryPartition", "['PT','ES','IT','BE']", "countryPartition")
dbutils.widgets.text("yearPartition", "['2022']", "yearPartition")
#dbutils.widgets.text("partitionColumn", "['dbo.table1|country', 'dbo.table2|country_year']", "partitionColumn")

databricksPath = dbutils.widgets.get('databricksPath')
countryPartition = dbutils.widgets.get('countryPartition')
yearPartition = dbutils.widgets.get('yearPartition')
#partitionColumn = dbutils.widgets.get('partitionColumn')

#creating seperate dataframe for each of the above.
path_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('databricksPath'))],schema=['path'])
cp_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('countryPartition'))],schema=['country'])
y_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('yearPartition'))],schema=['year'])
#p_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('partitionColumn'))],schema=['partition_col'])


#applying cross join to get all combination results.
from pyspark.sql.functions import broadcast
final_df= broadcast(broadcast(cp_df).crossJoin(y_df)).crossJoin(path_df)
#final_df= broadcast(broadcast(broadcast(cp_df).crossJoin(y_df)).crossJoin(path_df)).crossJoin(p_df)

#from pyspark.sql.functions import split
#fdf = final_df.select('country','year','path',split(final_df['partition_col'],'[|]').getItem(0).alias('table'),split(final_df['partition_col'],'[|]').getItem(1).alias('partition'))

#from pyspark.sql.functions import array
#fdf = fdf.withColumn('countryYear', array(col('country'),col('year')))

#get the result dataframe as a dictionary
output = [eval(i) for i in final_df.toJSON().collect()]
#output = [eval(i) for i in fdf.toJSON().collect()]

#returning the above output dictionary/JSON to data factory
import json
dbutils.notebook.exit(json.dumps(output))
  • 使用该代码,output 的值将是对象数组(如输出示例)

enter image description here

  • 当我在 Azure 数据工厂中通过笔记本事件运行此笔记本时,它会给出以下结果:

enter image description here

更新:这是output image了解更新的要求。

关于azure - Pyspark - 基于数据帧创建包含所有组合的 json 结构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73719094/

相关文章:

Databricks SQL IsNull 语句

azure - 使用 IFD 身份验证在 Azure 数据工厂中创建动态 CRM 本地链接服务时出现错误

azure - 无法从 powershell 恢复应用程序

azure - ADF IActivityLogger 记录器存储在哪里?

apache-spark - 在 Spark 2.1.0 中启用 _metadata 文件

apache-spark - 获取 StructType 格式的 Parquet 文件的架构

scala - 创建 Spark Dataframe 的摘要

azure - Azure Openshift 集群的持久卷

python - 在 pyspark 中找不到 col 函数

python-3.x - 在pyspark中添加UUID的有效方法