python - Spark - 如何从 S3 读取多个具有文件名的多个 Json 文件

标签 python apache-spark pyspark apache-spark-sql databricks

我在 S3 中有很多行分隔的 json 文件,我想在 spark 中读取所有这些文件,然后读取 json 中的每一行,并为该行输出一个字典/行,文件名作为一列。我将如何以有效的方式在 python 中执行此操作?每个 json 大约 200 MB。

这是一个文件示例(这样会有 200,000 行),将此文件命名为 class_scores_0219:

{"name": "Maria C", "class":"Math", "score":"80", "student_identification":22}
{"name": "Maria F", "class":"Physics", "score":"90", "student_identification":12}
{"name": "Fink", "class":"English", "score":"75", "student_identification":7}

输出的 DataFrame 将是(为简单起见,只显示一行):

+-------------------+---------+-------+-------+------------------------+
|     file_name     |  name   | class | score | student_identification |
+-------------------+---------+-------+-------+------------------------+
| class_scores_0219 | Maria C | Math  |    80 |                     22 |
+-------------------+---------+-------+-------+------------------------+

我已经使用这个设置了 s3 key /访问 key :sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", SECRET_KEY) (访问 key 也是一样),但需要以不同的方式连接。

我愿意接受最有效的任何选项,我可以提供文件列表并将其馈入,或者我可以连接到 boto3 并提供前缀。我是 Spark 的新手,所以非常感谢所有帮助。

最佳答案

您可以通过使用 spark 本身来实现这一点。

只需添加一个带有 input_file_names 的新列,您就会得到所需的结果

from pyspark.sql.functions import input_file_name
df = spark.read.json(path_to_you_folder_conatining_multiple_files)
df = df.withColumn('fileName',input_file_name())

如果你想读取多个文件,你可以将它们作为文件列表传递

files = [file1, file2, file3]
df = spark.read.json(*files)

或者,如果您的文件列表与通配符匹配,那么您可以像下面这样使用它

df = spark.read.json('path/to/file/load2020*.json')

或者您可以使用 boto3 列出文件夹中的所有对象,然后创建所需文件的列表并将其传递给 df。

希望对您有所帮助。

关于python - Spark - 如何从 S3 读取多个具有文件名的多个 Json 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61602862/

相关文章:

python - 为什么 subprocess.run 的输出出现得太早?

apache-spark - Apache Mahout 和 Apache Spark 的 MLlib 有什么区别?

scala - “value $ is not a member of StringContext” - 缺少 Scala 插件?

scala - Spark : Split is not a member of org. apache.spark.sql.Row

pyspark - 无法在Databricks中使用pyspark读取json文件

apache-spark - 在 PySpark Structured Streaming 中对多个输出流使用单个流式 DataFrame

python - 如何强制 virtualenv 从 pypi 安装最新的 setuptools 和 pip?

python - 检索随机数字时出现类型错误

Python worker 连接失败

python - 考虑到强制重定向的多组规则?