我正在尝试编写一个 python 脚本,它使用子进程模块将文件从一个 s3 存储桶复制到另一个存储桶。但是,为了提高性能,我尝试并行运行具有不同前缀的单独同步命令。
到目前为止我已经尝试过脚本没有终止,我不确定子进程是否同时运行。
import subprocess
prefix = ['prefix1','prefix2','prefix3']
source_bucket = 's3://source'
dest_bucket = 's3://dest'
commands = []
for p in prefix:
command = 'aws s3 sync source_bucket' + p + ' dest_bucket'
commands.append(command)
procs = [subprocess.Popen(i, shell=True, stdout=subprocess.PIPE) for i in commands]
for p in procs:
p.wait()
有更好的方法吗?感谢您的帮助。
最佳答案
因为您要传入 subprocess.PIPE
,不同的进程将在等待输出时阻塞。您需要运行一个单独的进程来与每个 aws 实例进行通信。一种可能性是使用 Python 的多处理:
import subprocess
import multiprocessing
def worker(command, queue):
# Don't use shell here, we can avoid the overhead
proc = subprocess.Popen(command, stdout=subprocess.PIPE, bufsize=1, universal_newlines=True)
# Read from the aws command output and send it off the queue as soon
# as it's available
for line in proc.stdout:
queue.put(line.rstrip("\r\n"))
# Notify the listener that we're done
queue.put(None)
def main():
# Move all work to a function so it's multiprocessing safe, see
# https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming
# Note: Adding trailing slash so "source_bucket" + "prefix" is a valid S3 URI
prefixes = ['prefix1/', 'prefix2/', 'prefix3/']
source_bucket = 's3://source/'
dest_bucket = 's3://dest/'
# Create a queue to gather final messages from each worker
queue = multiprocessing.Queue()
procs = []
for p in prefixes:
# Pass in --no-progress so the progress messages aren't shown
# displaying those messages is complicated, and requires quite a bit
# of work to make sure they don't interfer with each other
# Correct the command syntax here to use all the variables
# Need to pass in the prefix to the dest URI as well so the same structure is
# maintained
# Use a argv style call here so we can avoid bringing the shell into this
command = ['aws', 's3', 'sync', '--no-progress', source_bucket + p, dest_bucket + p]
# Hand off the work to a worker to read from the pipe to prevent each
# spawned aws instance from blocking
proc = multiprocessing.Process(target=worker, args=(command, queue))
proc.start()
procs.append(proc)
# Read from the Queue to show the output
left = len(procs)
while left > 0:
msg = queue.get()
if msg is None:
# This means a worker is done
left -= 1
else:
# Just print out the output, doing it in one process to prevent any
# collision possibilities
print(msg)
# Clean up
for proc in procs:
proc.join()
if __name__ == "__main__":
main()
关于python - 如何使用 Python 为不同的前缀同时运行 AWS S3 sync 命令,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69315951/