python - 多线程 S3 下载不会终止

标签 python multithreading amazon-s3 boto

我正在使用 python boto 和线程从 S3 快速下载许多文件。我在我的程序中多次使用它并且效果很好。但是,有时它不起作用。在那一步中,我尝试在 32 核机器 (Amazon EC2 cc2.8xlarge) 上下载 3,000 个文件。

下面的代码实际上成功地下载了每个文件(除了有时会出现 httplib.IncompleteRead 错误,该错误无法通过重试得到修复)。然而,32 个线程中只有 10 个左右实际终止,程序就这样挂起。不知道这是为什么。所有文件都已下载,所有线程应该已退出。当我下载较少的文件时,它们会执行其他步骤。我已经减少到使用单个线程下载所有这些文件(可以工作但速度非常慢)。任何见解将不胜感激!

from boto.ec2.connection import EC2Connection
from boto.s3.connection import S3Connection
from boto.s3.key import Key

from boto.exception import BotoClientError
from socket import error as socket_error
from httplib import IncompleteRead

import multiprocessing
from time import sleep
import os

import Queue
import threading

def download_to_dir(keys, dir):
    """
    Given a list of S3 keys and a local directory filepath,
    downloads the files corresponding to the keys to the local directory.
    Returns a list of filenames.
    """
    filenames = [None for k in keys]

    class DownloadThread(threading.Thread):

        def __init__(self, queue, dir):
            # call to the parent constructor
            threading.Thread.__init__(self)
            # create a connection to S3
            connection = S3Connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
            self.conn = connection
            self.dir = dir
            self.__queue = queue

        def run(self):
            while True:
                key_dict = self.__queue.get()
                print self, key_dict
                if key_dict is None:
                    print "DOWNLOAD THREAD FINISHED"
                    break
                elif key_dict == 'DONE': #last job for last worker
                    print "DOWNLOADING DONE"
                    break
                else: #still work to do!
                    index = key_dict.get('idx')
                    key = key_dict.get('key')
                    bucket_name = key.bucket.name
                    bucket = self.conn.get_bucket(bucket_name)
                    k = Key(bucket) #clone key to use new connection
                    k.key = key.key

                    filename = os.path.join(dir, k.key)
                    #make dirs if don't exist yet
                    try:
                        f_dirname = os.path.dirname(filename)
                        if not os.path.exists(f_dirname):
                            os.makedirs(f_dirname)
                    except OSError: #already written to
                        pass

                    #inspired by: http://code.google.com/p/s3funnel/source/browse/trunk/scripts/s3funnel?r=10
                    RETRIES = 5 #attempt at most 5 times
                    wait = 1
                    for i in xrange(RETRIES):
                        try:
                            k.get_contents_to_filename(filename)
                            break
                        except (IncompleteRead, socket_error, BotoClientError), e:
                            if i == RETRIES-1: #failed final attempt
                                raise Exception('FAILED TO DOWNLOAD %s, %s' % (k, e))
                                break
                            wait *= 2
                            sleep(wait)

                    #put filename in right spot!
                    filenames[index] = filename

    num_cores = multiprocessing.cpu_count()

    q = Queue.Queue(0)

    for i, k in enumerate(keys):
        q.put({'idx': i, 'key':k})
    for i in range(num_cores-1):
        q.put(None) # add end-of-queue markers
    q.put('DONE') #to signal absolute end of job

    #Spin up all the workers
    workers = [DownloadThread(q, dir) for i in range(num_cores)]
    for worker in workers:
        worker.start()

    #Block main thread until completion
    for worker in workers:
        worker.join() 

    return filenames

最佳答案

升级到 AWS SDK 版本 1.4.4.0 或更新版本,或者严格使用 2 个线程。旧版本有一个 limit最多 2 个同时连接。这意味着如果您启动 2 个线程,您的代码将运行良好;如果您启动 3 个或更多,您一定会看到不完整的读取和耗尽的超时。

你会看到,虽然 2 个线程可以大大提高你的吞吐量,但超过 2 个线程并没有太大变化,因为你的网卡一直很忙。

关于python - 多线程 S3 下载不会终止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11441412/

相关文章:

java - 线程卡在 readUTF() 中

multithreading - 在Perl中实现看门狗

python - AWS使用Lambda将文件从S3存储桶下载到Windows本地目录

python - Boto (Python)-反转的遗愿 list

python - 从一组文档中找到最相似的文档(最近的邻居)

python - 在Pycharm中运行脚本时如何自动将焦点切换到python控制台?

python - 如何使用保存在 HDF5 文件中的 Keras 训练模型进行预测?

c - 不同内核上的多个线程读取同一组文件

amazon-s3 - S3 发布 InvalidPolicyDocument 存储桶依赖?

python - Kivy:使用一个切换按钮来更改另一个切换按钮的状态