hadoop - 如何从 pyspark rdd 或分区确定原始 s3 输入文件名

标签 hadoop amazon-s3 apache-spark pyspark pyspark-sql

我正在使用 pyspark 流式处理来自 S3 的 ETL 输入文件。

我需要能够建立所有原始输入文件的审计线索 在 s3://上,我的 Parquet 输出在 hdfs://上结束。

给定一个dstream、rdd,甚至是一个特定的rdd分区,是否有可能 确定 s3 中输入数据的原始文件名?

目前我知道的唯一方法是采取 rdd.toDebugString() 并尝试解析它。然而,这感觉真的很 hacky 并且不 在某些情况下工作。例如,解析调试输出对我的批处理模式导入不起作用 我也在做(使用 sc.TextFile("s3://...foo/*") 样式的 glob)。

有没有人有确定原始文件名的明智方法?

似乎其他一些 spark 用户过去也有过这个问题,因为 示例:

http://apache-spark-user-list.1001560.n3.nabble.com/Access-original-filename-in-a-map-function-tt2831.html

谢谢!

最佳答案

我们遇到了同样的问题,而且文件足够小,所以我们使用了 sc.wholeTextFiles("s3:...foo/*") .

创建 ("<path/filename>","<content>") 的 RDD我们将文件名附加到文件内容以供使用。

How to convert RDD[(String, String)] into RDD[Array[String]]?

关于hadoop - 如何从 pyspark rdd 或分区确定原始 s3 输入文件名,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33974501/

相关文章:

hadoop - SQOOP-将配置单元中的ORC格式表导出到DB2数据库

apache-spark - 使用 lit() 在 Spark 中创建新列

scala - 如何使用 JohnSnowLabs NLP 拼写校正模块 NorvigSweetingModel?

java - 使用 Java 将 AWS 文件上传到 S3

python - Boto3 没有将 zip 文件上传到 S3 python

android - AWS S3 TransferService 上传失败但没有错误

apache-spark - 如何在没有 hive-site.xml 的情况下将 Spark SQL 连接到远程 Hive Metastore(通过节俭协议(protocol))?

hadoop - Hive按日期分区,为什么要用string类型?为什么不是int?

sql - Apache Impala的迭代函数

hadoop - 如何在Hive表中实现触发器概念