palantir-foundry - 如何在代码存储库中获取数据集的名称

标签 palantir-foundry

在代码库中用 Python 组合多个数据集时,我想将数据集名称放在第一列。但是我无法通过访问它的路径来弄清楚

@transform_df(
    Output("/folder/folder1/datasets/mydatset"),
    df1 = Input("A"),
    df2 = Input("B"),
)

def compute(df1, df2, df3):
    print(list(filter(os.path.isfile, os.listdir())))

如何从转换中获取我的数据集名称?

最佳答案

使用 @transform_df 装饰器是不可能的。然而,可以使用更强大的 @transform 装饰器。

API Documentation for @transform

使用 @transform 将导致您的函数参数变为 TransformInput 类型,而不是直接具有属性 path 的数据帧。请注意,在使用 @transform 时,您还需要手动引用和写入输出数据集。

例如:

@transform(
    out=Output("/path/to/my/output"),
    inp1=Input("/path/to/my/input1"),
    inp2=Input("/path/to/my/input2"),
)
def compute(out, inp1, inp2):
    # Add columns containing dataset paths.
    df1 = inp1.dataframe().withColumn("dataset_path", F.lit(inp1.path))
    df2 = inp2.dataframe().withColumn("dataset_path", F.lit(inp2.path))

    # For example.
    result = union_many(df1, df2, how="strict")

    # Write output manually
    out.write_dataframe(result)

但是请注意,数据集的路径是一个不稳定的标识符。如果有人要移动或重命名这些输入,可能会在您的管道中导致意外行为。

因此,对于生产流水线,我通常建议使用更稳定的标识符。要么是手动选择的硬编码(在这种情况下,您可以再次使用 @transform_df):

@transform_df(
    df1=Input("/path/to/my/input1"),
    df2=Input("/path/to/my/input2"),
)
def compute(df1, df2):
    df1 = df1.withColumn("input_dataset", F.lit("input_1"))
    df2 = df2.withColumn("input_dataset", F.lit("input_2"))
    # ...etc

或数据集的 RID,使用 inp1.rid 而不是 inp1.path

请注意,如果您有大量输入,所有这些方法都可以使用 python 的可变参数语法和推导式变得更简洁:

# Using path or rid
@transform(
    out=Output("/path/to/my/output"),
    inp1=Input("/path/to/my/input1"),
    inp2=Input("/path/to/my/input2"),
    # and many more...
)
def compute(out, **inps):
    # Add columns containing dataset rids (or paths).
    dfs = [
        inp.dataframe().withColumn("dataset_rid", F.lit(inp.rid))
        for key, inp in inps.items()
    ]

    # For example
    result = union_many(*dfs, how="strict")
    out.write_dataframe(result)


# Using manual keys, we can reuse the argument names as keys.
@transform_df(
    Output("/path/to/my/output"),
    df1=Input("/path/to/my/input1"),
    df2=Input("/path/to/my/input2"),
    # and many more...
)
def compute(**dfs):
    # Add columns containing dataset keys.
    dfs = [
        df.withColumn("dataset_key", F.lit(key))
        for key, df in dfs.items()
    ]

    # For example
    return union_many(*dfs, how="strict")

关于palantir-foundry - 如何在代码存储库中获取数据集的名称,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72758260/

相关文章:

pyspark - 如何在 Foundry 中解析大型压缩 csv 文件?

palantir-foundry - 代码工作簿 - 使用 hadoop_path 找不到文件

apache-spark - 将pyspark相关的JAR包安装到Foundry中

pyspark - 如何添加一列指示磁盘上文件的行号?

dictionary - 如何将数据集转换为存储库内的字典。我在 Foundry 中使用 pyspark

python - 如何在palantir-foundry中导入和使用Spark-Koalas

palantir-foundry - 将预训练的深度学习模型导入 Foundry Codeworkbooks

palantir-foundry - 是否可以在 Foundry Slate 中创建自定义输入小部件?

palantir-foundry - 如何更新 Palantir Foundry Ontology 编辑功能中的数组属性?

pyspark - 如何删除 "Missing transform attribute error"?