python - 将符合上次修改窗口的 S3 文件读入 DataFrame

标签 python python-3.x pandas apache-spark boto3

我有一个 S3 存储桶,其中的对象的“上次修改时间”范围从非常旧到当前。我需要能够在窗口中找到具有上次修改标记的文件,然后将这些文件(JSON 格式)读入某种数据帧(pandas、spark 等)中。

我尝试收集文件,单独读取它们并通过以下代码附加,但速度非常慢:

session = boto3.session.Session(region_name=region)

#Gather all keys that have a modified stamp between max_previous_data_extracted_timestamp and start_time_proper
s3 = session.resource('s3', region_name=region)
bucket = s3.Bucket(args.sourceBucket)
app_body = []
for obj in bucket.objects.all():
    obj_datetime = obj.last_modified.replace(tzinfo=None)
    if args.accountId + '/Patient' in obj.key and obj_datetime > max_previous_data_extracted_timestamp_datetime and obj_datetime <= start_time_datetime:
        obj_df = pd.read_csv(obj.get()['Body'])
        app_body.append(obj_df)

merged_dataframe = pd.concat(app_body)

逻辑是有效的,因为我只获取窗口内已修改的对象,但是,获取主体并附加到列表的下一部分在约 10K 文件上运行 30-45 分钟。必须有一种我没有想到的更好的方法来做到这一点。

最佳答案

Spark 是实现这一目标的一种方式。

当与包含大量文件的 S3 存储桶交谈时,我们始终需要记住,列出存储桶中的所有对象的成本很高,因为它一次返回 1000 个对象以及用于获取下一组对象的指针。这使得并行化变得非常困难,除非您了解结构并使用它来优化这些调用。

如果代码不起作用,我很抱歉,我使用 scala,但这应该几乎处于工作状态。

知道您的结构是bucket/account_identifier/Patient/Patient_identifier:

# account_identifiers -- provided from DB
accounts_df = sc.parallelize(account_identifiers, number_of_partitions)
paths = accounts_df.mapPartitions(fetch_files_for_account).collect()
df = spark.read.json(paths)


def fetch_files_for_account(accounts):
    s3 = boto3.client('s3')
    result = []
    for a in accounts:
        marker = ''
        while True:
            request_result = s3.list_objects(Bucket=args.sourceBucket, Prefix=a)
            items = request_result['Contents']
            for i in items:
                obj_datetime = i['LastModified'].replace(tzinfo=None)
                if obj_datetime > max_previous_data_extracted_timestamp_datetime and obj_datetime <= start_time_datetime:
                    result.append('s3://' + args.sourceBucket +'/' + i['Key'])
            if not request_result['IsTruncated']:
                break
            else:
                marker = request_result['Marker']
    return iter(result)

映射分区将确保您没有实例化太多客户端。您可以使用 number_of_partitions 控制该数量。

您可以做的另一个优化是在调用 mapPartitions 后手动加载内容,而不是使用 collect()。在该阶段之后,您将获得 JSON 内容的 String,然后调用 spark.createDataFrame(records, schema)。注意:您必须提供架构。

如果您没有 account_identifiers 或文件数量不会达到 100k 范围,您将必须列出存储桶中的所有对象,按 last_modified 进行过滤,基本上执行相同的调用:

spark.read.json(paths)

关于python - 将符合上次修改窗口的 S3 文件读入 DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60310736/

相关文章:

python - 在 Python 中链接子进程

python - 有没有一种方法可以将 PyUnicodeObject 变量转换为 PyObject 类型?

python - 从python中的矩阵中提取子元素

python-3.x - 如何从 UCI Machine Learning Repository 将数据集(.data 和 .names)直接读入 Python DataFrame

python - 如何获取 csv 中其条目与文本文件中的条目匹配的元素的索引

python - 有没有办法在SSD上写入,如果在写入过程中断开连接,数据不会丢失?

Python Web 套接字仅发送 1 条消息

Python CGI 将正确的文本返回给curl,但浏览器显示尾随零

python - 获取 TypeError : ord() expected string of length 1, 但 int 发现错误

python - `pandas to_json` 和 `read_json` 之间的文件大小差异较大