python - 高效地与请求异步下载文件

标签 python python-3.x performance python-requests

我想用 python 尽快下载文件。这是我的代码

import pandas as pd
import requests
from requests_futures.sessions import FuturesSession
import os
import pathlib
from timeit import default_timer as timer


class AsyncDownloader:
    """Download files asynchronously"""

    __urls = set()
    __dest_path = None
    __user_agent = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:58.0) Gecko/20100101 Firefox/58.0'
    __read_timeout = 60
    __connection_timeout = 30
    __download_count = 0  # unlimited
    # http://www.browserscope.org/?category=network
    __worker_count = 17  # No of threads to spawn
    __chunk_size = 1024
    __download_time = -1
    __errors = []

    # TODO Fetch only content of a specific type from a csv
    # TODO Improve code structure so that it can be used as a commandline tool

    def set_source_csv(self, source_path, column_name):
        self.source_path = source_path
        self.column_name = column_name

        try:
            my_csv = pd.read_csv(source_path, usecols=[self.column_name], chunksize=10)
        except ValueError:
            print("The column name doesn't exist")
            return
        else:
            # No exception whatsoever
            for chunk in my_csv:
                AsyncDownloader.__urls.update(set(getattr(chunk, self.column_name)))

    def set_destination_path(self, dest_path):
        if dest_path.endswith('/'):
            dest_path = dest_path[:-1]
        self.dest_path = dest_path
        # TODO Add exception in case we can't create the directory
        pathlib.Path(self.dest_path).mkdir(parents=True, exist_ok=True)
        if os.access(self.dest_path, os.W_OK):
            AsyncDownloader.__dest_path = pathlib.Path(self.dest_path).resolve()

    def set_user_agent(self, useragent):
        self.useragent = useragent
        AsyncDownloader.__user_agent = self.useragent

    def set_connection_timeout(self, ctimeout_secs):
        self.timeout_secs = ctimeout_secs
        if self.timeout_secs >= 0:
            AsyncDownloader.__connection_timeout = self.timeout_secs

    def set_read_timeout(self, rtimeout_secs):
        self.timeout_secs = rtimeout_secs
        if self.timeout_secs >= 0:
            AsyncDownloader.__read_timeout = self.timeout_secs

    def set_download_count(self, file_count):
        self.file_count = file_count
        if self.file_count > 0:
            AsyncDownloader.__download_count = self.file_count

    def set_worker_count(self, worker_count):
        self.worker_count = worker_count
        if self.worker_count > 0:
            AsyncDownloader.__worker_count = self.worker_count

    def set_chunk_size(self, chunk_size):
        self.chunk_size = chunk_size
        if self.chunk_size > 0:
            AsyncDownloader.__chunk_size = self.chunk_size

    def print_urls(self):
        print(AsyncDownloader.__urls)

    def get_download_time(self):
        return AsyncDownloader.__download_time

    def get_errors(self):
        return AsyncDownloader.__errors

    def download(self):
        start = timer()
        try:
            session = FuturesSession(max_workers=AsyncDownloader.__worker_count)
            session.headers.update({'user-agent': AsyncDownloader.__user_agent})
            session.request(AsyncDownloader.__connection_timeout,
                            AsyncDownloader.__connection_timeout, stream=True)

            results = []
            # Give an accurate file count even if we don't have to download it as it a;ready exist
            file_count = 0

            for url in AsyncDownloader.__urls:
                filename = os.path.basename(url)
                # check if we need only a limited number of files
                if AsyncDownloader.__download_count != 0:
                    # No need to download file if it already exist
                    if pathlib.Path(AsyncDownloader.__dest_path / filename).is_file():
                        file_count += 1
                        continue
                    else:
                        if file_count < AsyncDownloader.__download_count:
                            file_count += 1
                            results.append(session.get(url))
                else:
                    if not pathlib.Path(AsyncDownloader.__dest_path / filename).is_file():
                        results.append(session.get(url))

            for result in results:
                # wait for the response to complete, if it hasn't already
                response = result.result()
                filename = os.path.basename(response.url)
                if response.status_code == 200:
                    with open(pathlib.Path(AsyncDownloader.__dest_path / filename).resolve(), 'wb') as fd:
                        for chunk in response.iter_content(chunk_size=AsyncDownloader.__chunk_size):
                            if chunk:  # filter out keep-alive new chunks
                                fd.write(chunk)

            end = timer()
            AsyncDownloader.__download_time = end - start

        except requests.exceptions.HTTPError as errh:
            AsyncDownloader.__errors.append("Http Error:" + errh)
            # print("Http Error:", errh)
        except requests.exceptions.ConnectionError as errc:
            AsyncDownloader.__errors.append("Error Connecting:" + errc)
            # print("Error Connecting:", errc)
        except requests.exceptions.Timeout as errt:
            AsyncDownloader.__errors.append("Timeout Error:" + errt)
            # print("Timeout Error:", errt)
        except requests.exceptions.RequestException as err:
            AsyncDownloader.__errors.append("OOps: Something Else" + err)
        else:
            return

下面的代码做了一个非常糟糕的假设。事实上,我假设第一个 url 将首先完成,这当然是不正确的。

# wait for the response to complete, if it hasn't already
response = result.result()

我如何确保只处理已完成的请求,而不是像上面那样以有效的方式进行假设?

对于如何提高性能的任何其他建议,我将不胜感激。

亲切的问候

最佳答案

即使连接按顺序完成,您仍然按顺序处理文件。第二个文件必须等待第一个文件写入,依此类推。所以,你能做的最好的事情就是并行处理所有事情(尽管有 GIL,这也可以完成,因为像写入磁盘和从网络读取这样的 io 操作会释放它)。基本上,使用常规 requests 库(不是 requests-futures)并为每个请求 + 文件处理创建一个 future /线程。

还有更多方法可以让它更快,比如在写入时继续下载 block (即两个线程,一个用于请求,一个用于文件处理)。并通过发出 multi-part 请求并行读取 block ,这是“下载加速器”领域,您可能不希望代码中出现这种复杂性。

编辑:此外,分 block 下载是惰性的,这意味着您只是并行发出初始请求,但实际的分 block 文件下载是按顺序进行的,因为它是在主线程中完成的。因此,您当前的方法并不比完全同步 好多少。上述建议仍然有效。

关于python - 高效地与请求异步下载文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48628510/

相关文章:

python - pandas 更有效地从 csv 创建字典对象以作为 post 请求发送

python - python中比较列表的元素

Python 错误我无法解析 : value of a series is ambiguous

python - 如何将 python 列表分成 block - 但按倒序排列?

JavaScript效率: big object vs many arrays

python - Scrapy 在几页后停止爬行

python - 在 Jython 中使用 getattr 时出现 StackOverflowError

python-3.x - 如何将 reduce 函数的每次迭代存储在列表中?

performance - Telerik RadGrid 工作示例

javascript - 效率 - setInterval 与 Object.watch