python - 获取pyspark中执行者任务的任务id

标签 python apache-spark pyspark

我有一个rdd.foreachPartition(some_function) pyspark 中的操作。 some_function函数将执行器中当前任务的数据写入所有执行器公共(public)位置的文件中(例如 hdfs 或 s3 存储桶)。

现在,如果我在所有执行程序中使用相同的文件名,该文件将被替换并仅保留最后写入的文件。因此,我正在寻找一个唯一的标识符来表示每个任务,从而表示每个文件名。

我对任务 ID 很感兴趣,因为它是唯一的。但找不到任何地方,如何在pyspark中获取任务ID。我发现了一些东西similar在 scala/java 中,但不在 pyspark 中。

更新:按照建议,我查看了 this 。然而,这给出了 stageID,而我对每个阶段内各个任务的 taskID 感兴趣。不过,stageId 答案也是一个值得了解的好信息。

最佳答案

不久前我也遇到了同样的问题。

我通过在文件名中使用 datetime.now() 解决了这个问题,并且绝对确定我不会获得与我使用的相同的文件名:

rdd.mapPartitionsWithIndex(lambda x,y: (x,y), preservesPartitioning = True).foreachPartition(lambda x: some_function(x))

这将为您提供位于 x[0] 的唯一分区 ID,您可以将其添加到任何文件名以确保唯一性

关于python - 获取pyspark中执行者任务的任务id,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50186170/

相关文章:

python - 生成器在列表理解中过早结束

RserveException : eval failed when running R on Databricks

apache-spark - 如何将数据从数据帧导出到文件数据 block

apache-spark - 来自 .. 错误有效负载 : '400' t active 的无效状态代码 "requirement failed: Session isn'

python - 汉诺塔递归调用

python - 如何在 xls 末尾跳过 pandas 数据框中的行

python - 检查字符串列表和列表是否为空

hadoop - 提交 Spark 的工作绩效

scala - 如何使用值降序排列我的 Spark 结果元组

apache-spark - PySpark:如何在 For 循环中附加数据帧