我有一个 S3 存储桶,其中的对象的“上次修改时间”范围从非常旧到当前。我需要能够在窗口中找到具有上次修改标记的文件,然后将这些文件(JSON 格式)读入某种数据帧(pandas、spark 等)中。
我尝试收集文件,单独读取它们并通过以下代码附加,但速度非常慢:
session = boto3.session.Session(region_name=region)
#Gather all keys that have a modified stamp between max_previous_data_extracted_timestamp and start_time_proper
s3 = session.resource('s3', region_name=region)
bucket = s3.Bucket(args.sourceBucket)
app_body = []
for obj in bucket.objects.all():
obj_datetime = obj.last_modified.replace(tzinfo=None)
if args.accountId + '/Patient' in obj.key and obj_datetime > max_previous_data_extracted_timestamp_datetime and obj_datetime <= start_time_datetime:
obj_df = pd.read_csv(obj.get()['Body'])
app_body.append(obj_df)
merged_dataframe = pd.concat(app_body)
逻辑是有效的,因为我只获取窗口内已修改的对象,但是,获取主体并附加到列表的下一部分在约 10K 文件上运行 30-45 分钟。必须有一种我没有想到的更好的方法来做到这一点。
最佳答案
Spark 是实现这一目标的一种方式。
当与包含大量文件的 S3 存储桶交谈时,我们始终需要记住,列出存储桶中的所有对象的成本很高,因为它一次返回 1000 个对象以及用于获取下一组对象的指针。这使得并行化变得非常困难,除非您了解结构并使用它来优化这些调用。
如果代码不起作用,我很抱歉,我使用 scala,但这应该几乎处于工作状态。
知道您的结构是bucket/account_identifier/Patient/Patient_identifier
:
# account_identifiers -- provided from DB
accounts_df = sc.parallelize(account_identifiers, number_of_partitions)
paths = accounts_df.mapPartitions(fetch_files_for_account).collect()
df = spark.read.json(paths)
def fetch_files_for_account(accounts):
s3 = boto3.client('s3')
result = []
for a in accounts:
marker = ''
while True:
request_result = s3.list_objects(Bucket=args.sourceBucket, Prefix=a)
items = request_result['Contents']
for i in items:
obj_datetime = i['LastModified'].replace(tzinfo=None)
if obj_datetime > max_previous_data_extracted_timestamp_datetime and obj_datetime <= start_time_datetime:
result.append('s3://' + args.sourceBucket +'/' + i['Key'])
if not request_result['IsTruncated']:
break
else:
marker = request_result['Marker']
return iter(result)
映射分区将确保您没有实例化太多客户端。您可以使用 number_of_partitions
控制该数量。
您可以做的另一个优化是在调用 mapPartitions
后手动加载内容,而不是使用 collect()
。在该阶段之后,您将获得 JSON 内容的 String
,然后调用 spark.createDataFrame(records, schema)
。注意:您必须提供架构。
如果您没有 account_identifiers
或文件数量不会达到 100k 范围,您将必须列出存储桶中的所有对象,按 last_modified
进行过滤,基本上执行相同的调用:
spark.read.json(paths)
关于python - 将符合上次修改窗口的 S3 文件读入 DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60310736/