python - 在类中实例化线程

标签 python multithreading concurrency concurrent.futures

我在 class 中有一个 class 并且想在第二个 class 中激活 threading 功能。本质上,下面的脚本是我的正确项目的可重现模板。

当我使用 @threading 时,我发现 showit 不可迭代,所以 tp.map 认为我没有列表。

但是,当我运行时:

if __name__ == '__main__':
    tp = ThreadPoolExecutor(5)
    print(tp.map(testit(id_str).test_first, id_int))
    for values in tp.map(testit(id_str).test_first, id_int):
        values

我没有遇到任何问题,除了我希望预期的输出打印出列表中的每个数字。但是,我想在类里面实现这一目标。

类似下面的内容:

from concurrent.futures import ThreadPoolExecutor
from typing import List

id_str = ['1', '2', '3', '4', '5']
id_int = [1, 2, 3, 4, 5]

def threaded(fn, pools=10):
    tp = ThreadPoolExecutor(pools)
    def wrapper(*args):
        return tp.map(fn, *args)  # returns Future object
    return wrapper

class testit:
    def __init__(self, some_list: List[str]) -> None:
        self._ids = some_list
        print(self._ids)

    def test_first(self, some_id: List[int]) -> None:
        print(some_id)

class showit(testit):
    def __init__(self, *args):
        super(showit, self).__init__(*args)
    
    @threaded
    def again(self):
        global id_int
        for values in self.test_first(id_int):
            print(values)

a = showit(id_str)
print(a.again())

错误:

  File "test_1.py", line 32, in <module>
    print(a.again())
  File "test_1.py", line 10, in wrapper
    return tp.map(fn, *args)  # returns Future object
  File "/Users/usr/opt/anaconda3/lib/python3.8/concurrent/futures/_base.py", line 600, in map
    fs = [self.submit(fn, *args) for args in zip(*iterables)]
TypeError: 'showit' object is not iterable

预期输出:

1
2
3
4
5

最佳答案

注意事项:

  1. 编写一个带有可选参数的装饰器需要与您完成它的方式有所不同。
  2. 我假设您希望 threaded 装饰器能够同时支持函数和方法,如果这不难实现的话。

装饰函数/方法可以接受任意数量的位置参数和关键字参数(但至少有一个位置参数)。什么时候 你调用包装函数最后一个位置参数必须是 将与池的 map 方法一起使用的可迭代对象。 当 map 将函数/方法作为辅助函数调用时,所有位置参数和关键字参数都将传递给该函数/方法,但最后一个位置参数现在将成为可迭代对象的一个​​元素。

from concurrent.futures import ThreadPoolExecutor
from functools import wraps, partial

def threaded(poolsize=10):
    def outer_wrapper(fn):
        wraps(fn)
        def wrapper(*args, **kwargs):
            """
            When the wrapper function is called,
            the last positional argument iterable will be an iterable
            to be used with the pool's map method.

            Then when map is invoking the original unwrapped function/method
            as a worker function, the last positional argument will be an element of
            that iterable.
            """

            # We are being called with an iterable as the first argument"
            # Construct a worker function if necessary:
            with ThreadPoolExecutor(poolsize) as executor:
                worker = partial(fn, *args[:-1], **kwargs)
                return list(executor.map(worker, args[-1]))
        return wrapper
    return outer_wrapper

iterable = [1, 2, 3]

@threaded(len(iterable))
def worker1(x, y, z=0):
    """
    y, the last positional argument, will be an iterable when the
    wrapper function is called or an element of the iterable
    when the actual, unwrapped function, worker1, is called.
    """
    return x + (y ** 2) - z

@threaded(3)
def worker2(x):
    return x.upper()

class MyClass:
    @threaded()
    def foo(self, x):
        return -x

# last positional argument is the iterable:
print(worker1(100, iterable, z=10))
print(worker2(['abcdef', 'ghijk', 'lmn']))
o = MyClass()
print(o.foo([3, 2, 1]))

打印:

[91, 94, 99]
['ABCDEF', 'GHIJK', 'LMN']
[-3, -2, -1]

关于python - 在类中实例化线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73854849/

相关文章:

c++ - 多线程 - 线程是否同时开始运行?

python - Pika:消费下一条消息,即使上一条消息未被确认

c++ - 无法修改其他线程中 ref 传递的值

algorithm - 如何最小化并行计算的运行时间?

java - 如何轮流监听两组线程获取synchronized section?

python - 将具有多个结构的PDB文件解析为数组

python - Pandas .read_json(JSON_URL)

java - 需要有关 Java Web 应用程序设计的帮助以执行后台任务

python - 从 Python 列表中列出重复文件

python 以一种特定模式解析 xml