Python:如何异步 for 循环

标签 python python-3.x asynchronous python-asyncio

是否可以使用asyncio在Python中迭代generator对象?我创建了一个名为 hash_generator() 的简单函数,它返回一个唯一的哈希值。现在我决定对循环进行基准测试,迭代打印 100,000 个哈希值需要大约 8 秒的时间。我可以异步运行它以最小化时间吗?我阅读了它的文档,但我很困惑。我想探索异步,并且想从这个问题开始。

import hashlib
import string
import random
import time


def hash_generator():
    """Return a unique hash"""
    prefix = int(time.time())
    suffix = (random.choice(string.ascii_letters) for i in range(10))
    key = ".".join([str(prefix), str("".join(suffix))])
    value = hashlib.blake2b(key.encode(), digest_size=6).hexdigest()
    return value.upper()


"""Iterating the hashes and printing the time it loaded"""
hashes = (hash_generator() for i in range(100000))
time_before = time.time()
[print(i) for i in hashes]
time_after = time.time()
difference = time_after - time_before
print('Loaded in {0:.2f}sec'.format(difference))
# 40503CBA2DAE
# ...
# A511068F4945
# Loaded in 8.81sec

编辑1

random.choice() 函数是程序运行时间过长的主要原因。我重新创建了下面的函数,使用当前时间和来自 os.urandom 的随机字符串(低冲突)作为值。我尝试了多线程,但任务并没有以最快的速度运行,而是变得太慢了。任何重构下面代码的建议总是受欢迎的。

import hashlib
import time
import os
import timeit


def hash_generator():
    """Return a unique hash"""
    prefix = str(time.time())
    suffix = str(os.urandom(10))
    key = "".join([prefix, suffix])
    value = hashlib.blake2b(key.encode(), digest_size=6).hexdigest()
    return value.upper()


"""Iterating the hashes and printing the time it loaded"""
print(timeit.timeit(hash_generator, number=100000), "sec")
# 0.497149389999322 sec

编辑2

在 Jack Taylor 和 Stackoverflowers 的帮助下,我可以通过使用超过 100 万次迭代的多处理来看到差异。我对下面的代码进行了基准测试。

import hashlib
import time
import os
import timeit
import multiprocessing


def hash_generator(_=None):
    """Return a unique hash"""
    prefix = str(time.time())
    suffix = str(os.urandom(10))
    key = "".join([prefix, suffix])
    value = hashlib.blake2b(key.encode(), digest_size=6).hexdigest()
    return value.upper()


# Allows for the safe importing of the main module
if __name__ == "__main__":
    start_time = time.time()
    number_processes = 4
    iteration = 10000000
    pool = multiprocessing.Pool(number_processes)
    results = pool.map(hash_generator, range(iteration))
    pool.close()
    pool.join()
    end_time = time.time()
    pool_runtime = end_time - start_time
    print('(Pool) Loaded in: {0:.5f} sec'.format(pool_runtime))

    ordinary_runtime = timeit.timeit(hash_generator, number=iteration)
    print('(Ordinary) Loaded in: {0:.5f} sec'.format(ordinary_runtime))
<小时/>
iteration = 10
(Pool) Loaded in: 1.20685 sec
(Ordinary) Loaded in: 0.00023 sec

iteration = 1000
(Pool) Loaded in: 0.72233 sec
(Ordinary) Loaded in: 0.01767 sec

iteration = 1000
(Pool) Loaded in: 0.99571 sec
(Ordinary) Loaded in: 0.01208 sec

iteration = 10,000
(Pool) Loaded in: 1.07876 sec
(Ordinary) Loaded in: 0.12652 sec

iteration = 100,000
(Pool) Loaded in: 1.57068 sec
(Ordinary) Loaded in: 1.23418 sec

iteration = 1,000,000
(Pool) Loaded in: 4.28724 sec
(Ordinary) Loaded in: 11.56332 sec

iteration = 10,000,000
(Pool) Loaded in: 27.26819 sec
(Ordinary) Loaded in: 132.68170 sec

最佳答案

看起来顺序版本可能会更好。传统观点认为,在 Python 中,对于 I/O 密集型作业(文件读/写、网络),您可以通过使用事件循环或多线程以及 CPU 密集型作业(例如计算哈希值)来获得加速。 )您可以通过使用多个进程来提高速度。

但是,我采用了您的版本并使用 concurrent.futures 和进程池重写了它,结果非但没有加快速度,反而慢了 10 倍。

代码如下:

from concurrent import futures
import hashlib
import string
import random
import time

def hash_generator():
    """Return a unique hash"""
    prefix = int(time.time())
    suffix = (random.choice(string.ascii_letters) for i in range(10))
    key = ".".join([str(prefix), str("".join(suffix))])
    value = hashlib.blake2b(key.encode(), digest_size=6).hexdigest()
    return value.upper()

def main(workers = None):
    """Iterating the hashes and printing the time it loaded"""
    time_before = time.time()
    with futures.ProcessPoolExecutor(workers) as executor:
        worker_count = executor._max_workers
        jobs = (executor.submit(hash_generator) for i in range(100000))
        for future in futures.as_completed(jobs):
            print(future.result())
    time_after = time.time()
    difference = time_after - time_before
    print('Loaded in {0:.2f}sec with {1} workers'.format(difference, worker_count))

if __name__ == '__main__':
    main()

# 2BD6056CC0B4
# ...
# D0A6707225EB
# Loaded in 50.74sec with 4 workers

对于多个进程,启动和停止不同进程以及进程间通信会产生一些开销,这可能就是多进程版本比顺序版本慢的原因,即使它使用了所有 CPU核心。

您还可以尝试使用集群将工作分配到多台计算机上,和/或用较低级别的语言编写算法(Go 对我来说是一个不错的选择)。但我不知道这是否值得你花时间。

关于Python:如何异步 for 循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50522534/

相关文章:

python - 比较表中的字段和 django-orm 中的计算字段

Python socket ConnectionResetError : [Errno 54] Connection reset by peer vs socket. 错误:[Errno 104] Connection reset by peer

python - 将 python dict 转换为列表列表

C# - API 是否应该有 "async"修饰符?

python - 覆盖 Django 中的特定管理 css 文件

python - 我应该如何使用 pyparsing 组织我的函数?

c# - 新库中的异步与非异步方法

c# - 验证从异步调用报告的进度是否在单元测试中正确报告

python - 使用鼠标移动 QtWidgets.QtWidget

python - 使用 pathlib 时,出现错误 : TypeError: invalid file: PosixPath ('example.txt' )