python - 线程在加入时卡住

标签 python multithreading python-3.x concurrency

我正在运行一个线程池,该线程池出现随机错误。有时它可以工作,有时它会卡在这段代码的 pool.join 部分。我已经这样做好几天了,但找不到它何时工作或何时卡住之间的任何区别。请帮忙...

这是代码...

def run_thread_pool(functions_list):

    # Make the Pool of workers
    pool = ThreadPool()  # left blank to default to machine number of cores

    pool.map(run_function, functions_list)

    # close the pool and wait for the work to finish
    pool.close()
    pool.join()
    return

同样,这段代码也随机卡在 q.join(:

def run_queue_block(methods_list, max_num_of_workers=20):
    from views.console_output_handler import add_to_console_queue

    '''
    Runs methods on threads.  Stores method returns in a list.  Then outputs that list
    after all methods in the list have been completed.

    :param methods_list: example ((method name, args), (method_2, args), (method_3, args)
    :param max_num_of_workers: The number of threads to use in the block.
    :return: The full list of returns from each method.
    '''

    method_returns = []

    log = StandardLogger(logger_name='run_queue_block')

    # lock to serialize console output
    lock = threading.Lock()

    def _output(item):
        # Make sure the whole print completes or threads can mix up output in one line.
        with lock:
            if item:
                add_to_console_queue(item)
            msg = threading.current_thread().name, item
            log.log_debug(msg)

        return

    # The worker thread pulls an item from the queue and processes it
    def _worker():
        log = StandardLogger(logger_name='_worker')

        while True:
            try:
                method, args = q.get()  # Extract and unpack callable and arguments

            except:
                # we've hit a nonetype object.
                break

            if method is None:
                break

            item = method(*args)  # Call callable with provided args and store result
            method_returns.append(item)
            _output(item)

            q.task_done()

    num_of_jobs = len(methods_list)

    if num_of_jobs < max_num_of_workers:
        max_num_of_workers = num_of_jobs

    # Create the queue and thread pool.
    q = Queue()

    threads = []
    # starts worker threads.
    for i in range(max_num_of_workers):
        t = threading.Thread(target=_worker)
        t.daemon = True  # thread dies when main thread (only non-daemon thread) exits.
        t.start()
        threads.append(t)

    for method in methods_list:
        q.put(method)

    # block until all tasks are done
    q.join()

    # stop workers
    for i in range(max_num_of_workers):
        q.put(None)
    for t in threads:
        t.join()

    return method_returns

我永远不知道什么时候它会起作用。它在大多数时候都有效,但大多数时候都不够好。什么可能导致这样的错误?

最佳答案

要允许您的队列加入第二个示例,您需要确保从队列中删除所有任务。

因此,在您的 _worker 函数中,即使无法处理任务,也将任务标记为已完成,否则队列将永远不会被清空,并且您的程序将挂起。

def _worker():
    log = StandardLogger(logger_name='_worker')

    while True:
        try:
            method, args = q.get()  # Extract and unpack callable and arguments

        except:
            # we've hit a nonetype object.
            q.task_done()
            break

        if method is None:
            q.task_done()
            break

        item = method(*args)  # Call callable with provided args and store result
        method_returns.append(item)
        _output(item)

        q.task_done()

关于python - 线程在加入时卡住,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41479972/

相关文章:

python - 这个数组怎么分?

python - django - 模板过滤器中的多个排除

python - 如何使用 pymongo 的 collection.update_one 或 update_many 指定不安全/安全写入

python-3.x - 用 Bokeh 为两条曲线之间的区域着色

python - 有没有办法杀死一个线程?

java - 多线程文件处理和数据库批量插入

python - 从字典中获取值而不链接到内存位置

python - 如何舍入 datetime64 值

python - 改变 matplotlib 读取图像的方式

c#线程访问其他线程