python - Pool 只执行一个线程而不是 4 个,如何使其无限?

标签 python multithreading python-2.7 concurrency python-multithreading

所以我正在开发一个 Python 小工具来对应用程序的 API 进行压力测试。

我有一个使用线程的非常好的脚本,但后来我读到它需要手动编码来维护 n 个并发线程(意味着,旧线程完成后立即启动新线程),以及这里的建议: How to start a new thread when old one finishes?就是使用ThreadPool,我尝试如下:

def test_post():
    print "Executing in " + threading.currentThread().getName() + "\n"
    time.sleep(randint(1, 3))
    return randint(1, 5), "Message"


if args.send:
    code, content = post()
    print (code, "\n")
    print (content)
elif args.test:
    # Create new threads
    print threads
    results_list = []
    pool = ThreadPool(processes=threads)
    results = pool.apply_async(test_post())
    pool.close()  # Done adding tasks.
    pool.join()  # Wait for all tasks to complete.
    # results = list(pool.imap_unordered(
    #     test_post(), ()
    # ))
    # thread_list = []
    # while threading.activeCount() <= threads:
    #     thread = LoadTesting(threadID=free_threads, name="Thread-" + str(threading.activeCount()), counter=1)
    #     thread.start()
    #     thread_list.append(thread)
    print "Exiting Main Thread" + "\n"
else:
    print ("cant get here!")

当我调用脚本时,我得到一致的输出,例如:

4

Executing in MainThread

Exiting Main Thread

我不确定为什么......正如您在注释掉的 block 中看到的那样,我尝试了不同的方法,但它仍然只执行一次。

我的目标是让脚本循环运行,总是随时运行n个线程。 test_post (分别是 post )函数返回 HTTP 响应代码和内容 - 我想稍后在响应代码不是 200 OK 时使用它来打印/停止.

最佳答案

您的第一个问题是您已经在 MainThread 中调用了函数:

pool.apply_async(test_post())

...而不是传递 test_post 作为要在工作线程中执行的调用的参数:

pool.apply_async(test_post)
<小时/>

OP: I've got a pretty nice script using Threading, but then I read that it will require manual coding to maintain n number of concurrent threads (meaning, starting new ones as soon as old ones finish) ...

您需要区分工作单元(作业、任务)和线程。首先使用池的全部目的是重用执行器,无论是线程还是进程。当池被实例化时,工作线程就已经被创建了,只要您不关闭池,所有初始线程都会保持事件状态。因此,您不必关心重新创建线程,只需在需要分发一些工作时调用现有池的池方法即可。池接受此作业(池方法调用)并从中创建任务。这些任务被放入无界队列中。每当工作人员完成任务时,它就会阻塞地尝试从这样的inqueueget()一个新任务。

<小时/>

OP: Pool only executes a single thread instead of 4...I tried different ways and it still does it only once.

pool.apply_async(func, args=(), kwds={}, callback=None, error_callback=None)

...是单调用、单任务生成作业。如果您想要多次执行 func,则必须多次调用 pool.apply_async(),或者使用映射池方法,例如

pool.map(func, iterable, chunksize=None)

...,它将一个函数映射到一个可迭代对象上。 pool.apply_async 是非阻塞的,这就是它是“异步”的原因。它立即返回一个 AsyncResult 对象,您可以(阻塞地)调用 .wait().get()

<小时/>

Through the comments it became clear, that you want endless and immediate replacements for finished tasks (self produced input-stream)...and the program should stop on KeyboardInterrupt or when a result does not have a certain value.

您可以使用 apply_asynccallback 参数在任何旧任务完成后立即安排新任务。困难在于如何同时处理主线程以防止整个脚本过早结束,同时保持对键盘中断的响应。让主线程在循环中休眠,使其仍然可以立即对键盘中断使用react,同时防止提前退出。如果结果应该停止程序,您可以让回调终止池。然后,主线程只需在其 sleep 循环中包含对池状态的检查。

import time
from random import randint, choice
from itertools import count
from datetime import datetime
from threading import current_thread
from multiprocessing.pool import ThreadPool


def test_post(post_id):
    time.sleep(randint(1, 3))
    status_code = choice([200] * 9 + [404])
    return "{} {} Message no.{}: {}".format(
        datetime.now(), current_thread().name, post_id, status_code
    ), status_code


def handle_result(result):
    msg, code = result
    print(msg)
    if code != 200:
        print("terminating")
        pool.terminate()
    else:
        pool.apply_async(
            test_post, args=(next(post_cnt),), callback=handle_result
        )


if __name__ == '__main__':

    N_WORKERS = 4

    post_cnt = count()

    pool = ThreadPool(N_WORKERS)

    # initial distribution
    for _ in range(N_WORKERS):
        pool.apply_async(
            test_post, args=(next(post_cnt),), callback=handle_result
        )

    try:
        while pool._state == 0:  # check if pool is still alive
            time.sleep(1)
    except KeyboardInterrupt:
        print(" got interrupt")

使用键盘中断的示例输出:

$> python2 scratch.py
2019-02-15 18:46:11.724203 Thread-4 Message no.3: 200
2019-02-15 18:46:12.724713 Thread-2 Message no.1: 200
2019-02-15 18:46:13.726107 Thread-1 Message no.0: 200
2019-02-15 18:46:13.726292 Thread-3 Message no.2: 200
2019-02-15 18:46:14.724537 Thread-4 Message no.4: 200
2019-02-15 18:46:14.726881 Thread-2 Message no.5: 200
2019-02-15 18:46:14.727071 Thread-1 Message no.6: 200
^C got interrupt

由于不需要的返回值而终止的示例输出:

$> python2 scratch.py
2019-02-15 18:44:19.966387 Thread-3 Message no.0: 200
2019-02-15 18:44:19.966491 Thread-4 Message no.1: 200
2019-02-15 18:44:19.966582 Thread-1 Message no.3: 200
2019-02-15 18:44:20.967555 Thread-2 Message no.2: 200
2019-02-15 18:44:20.968562 Thread-3 Message no.4: 404
terminating

请注意,在您的场景中,您还可以比 N_WORKERS 次更频繁地调用 apply_async ,以便初始分配有一些缓冲区以减少延迟。

关于python - Pool 只执行一个线程而不是 4 个,如何使其无限?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54698848/

相关文章:

python - pip 的 `--no-cache-dir` 有什么用?

python - 如何从 Win32_PnPEntity 实例中提取特定属性?

c - 如何在多线程应用程序中使用 freopen() 重定向线程虎钳日志文件中的标准输出和标准输入?

python - 在池多处理中写入文件 (Python 2.7)

python - 如何将文件内容修改为字符串,同时有权访问影响字符串中的行的方法?

python - 使用正则表达式来匹配特定数据

python - 将 JSON 数据转换为 CSV 格式

Python:如何使脚本在特定时间每天运行?

c# - 一般线程问题

c# - 如何在不使用 threadPool 和使用 EventWaitHandler 进行处理的情况下运行线程束中的进程数