python - 如何将 PySpark 数据框插入到具有雪花模式的数据库中?

标签 python database pyspark

我正在使用 PySpark 计算一个数据框,如果这个数据库有一个 snowflake schema,我该如何将这个数据框附加到我的数据库中? ?

如何指定拆分数据框的方式,以便将类似 CSV 的数据放入多个联合表中?

我的问题并不特定于 Pyspark,同样的问题也可以问到 pandas。

最佳答案

将从 CSV 中提取的数据帧附加到由雪花模式组成的数据库:

  1. 从雪花模式中提取数据。
  2. 从外部数据源提取新数据。
  3. 合并两个数据集。
  4. 将组合转换为一组维度表和事实表以匹配雪花模式。
  5. 将转换后的数据帧加载到数据库中,覆盖现有数据。

例如对于具有以下架构的数据框,从外部源中提取:

StructType([StructField('customer_name', StringType()),
            StructField('campaign_name', StringType())])
def entrypoint(spark: SparkSession) -> None:
  extracted_customer_campaigns = extract_from_external_source(spark)

  existing_customers_dim, existing_campaigns_dim, existing_facts = (
    extract_from_snowflake(spark))

  combined_customer_campaigns = combine(existing_campaigns_dim,
                                        existing_customers_dim,
                                        existing_facts,
                                        extracted_customer_campaigns)

  new_campaigns_dim, new_customers_dim, new_facts = transform_to_snowflake(
    combined_customer_campaigns)

  load_snowflake(new_campaigns_dim, new_customers_dim, new_facts)


def combine(campaigns_dimension: DataFrame,
            customers_dimension: DataFrame,
            facts: DataFrame,
            extracted_customer_campaigns: DataFrame) -> DataFrame:
  existing_customer_campaigns = facts.join(
    customers_dimension,
    on=['customer_id']).join(
    campaigns_dimension, on=['campaign_id']).select('customer_name',
                                                    'campaign_name')

  combined_customer_campaigns = extracted_customer_campaigns.union(
    existing_customer_campaigns).distinct()

  return combined_customer_campaigns


def transform_to_snowflake(customer_campaigns: DataFrame) -> (
    DataFrame, DataFrame):
  customers_dim = customer_campaigns.select(
    'customer_name').distinct().withColumn(
    'customer_id', monotonically_increasing_id())

  campaigns_dim = customer_campaigns.select(
    'campaign_name').distinct().withColumn(
    'campaign_id', monotonically_increasing_id())

  facts = (
    customer_campaigns.join(customers_dim,
                            on=['customer_name']).join(
      campaigns_dim, on=[
        'campaign_name']).select('customer_id', 'campaign_id'))

  return campaigns_dim, customers_dim, facts

这是一种简单的函数式方法。可以通过编写增量来优化,而不是为每个 ETL 批处理重新生成雪花键。

此外,如果提供了一个单独的外部 CSV 包含要删除的记录,则可以类似地提取它,然后在转换之前从组合数据框中减去,以删除那些现有记录。

最后,问题仅涉及附加到表格。如果需要合并/更新,则需要手动添加其他步骤 Spark itself does not support it .

关于python - 如何将 PySpark 数据框插入到具有雪花模式的数据库中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62303471/

相关文章:

python - 使用多处理在 Python 中读取多个 HDF5 文件

python - 如何创建一个宏来遍历 Pandas Dataframe 中的所有列?

python - 查找两个列表之间差异的快速方法,适用于所有情况

python - 将日期列转换为时间戳列 + 小时的最有效方法

python - 类型错误 : string indices must be integers - while trying to read secret

python - 更新目录时运行 Python 脚本

java - 如何从两个不同的表中获取数据

database - 交错表术语

apache-spark - 使用 Spark 读取 SAS sas7bdat 数据

c# - 如何使用 C# 或通过 Interconnect 或 Passthru 将数据从 TitanDB 导出到 MySQL