python - 如何将线程局部变量与 ThreadPoolExecutor 一起使用?

标签 python multithreading threadpoolexecutor

我想要线程有一些局部变量,使用thread.Thread可以优雅地完成这样的操作:

class TTT(threading.Thread):
    def __init__(self, lines, ip, port):
        threading.Thread.__init__(self)
        self._lines = lines;
        self._sock = initsock(ip, port)
        self._sts = 0
        self._cts = 0

    def run(self):
        for line in self._lines:
            query = genquery(line)
            length = len(query)
            head = "0xFFFFFFFE"
            q = struct.pack('II%ds'%len(query),  head,  length, query)
            sock.send(q)
            sock.recv(4)
            length,  = struct.unpack('I',  sock.recv(4))
            result = ''
            remain = length
            while remain:
                t = sock.recv(remain)
                result+=t
                remain-=len(t)
            print(result)

如您所见,_lines _sock _sts _cts 这些变量在每个线程中都是独立的。

但是对于concurrent.future.ThreadPoolExecutor来说,似乎就没那么容易了。使用ThreadPoolExecutor,我怎样才能让事情变得优雅?(不再有全局变量)


新编辑

class Processor(object):
    def __init__(self, host, port):
        self._sock = self._init_sock(host, port)

    def __call__(self, address, adcode):
        self._send_data(address, adcode)
        result = self._recv_data()
        return json.loads(result)

def main():
    args = parse_args()
    adcode = {"shenzhen": 440300}[args.city]

    if args.output:
        fo = open(args.output, "w", encoding="utf-8")
    else:
        fo = sys.stdout
    with open(args.file, encoding=args.encoding) as fi, fo,\
        ThreadPoolExecutor(max_workers=args.processes) as executor:
        reader = csv.DictReader(fi)
        writer = csv.DictWriter(fo, reader.fieldnames + ["crfterm"])
        test_set = AddressIter(args.file, args.field, args.encoding)
        func = Processor(args.host, args.port)
        futures = map(lambda x: executor.submit(func, x, adcode), test_set)
        for row, future in zip(reader, as_completed(futures)):
            result = future.result()
            row["crfterm"] = join_segs_tags(result["segs"], result["tags"])
            writer.writerow(row)

最佳答案

使用与您现在的布局非常相似的布局将是最简单的事情。使用普通对象代替 Thread,并在 __call__ 中实现逻辑,而不是 run:

class TTT:
    def __init__(self, lines, ip, port):
        self._lines = lines;
        self._sock = initsock(ip, port)
        self._sts = 0
        self._cts = 0

    def __call__(self):
        ...
        # do stuff to self

添加__call__类的方法使得可以像调用常规函数一样调用实例。事实上,普通的函数就是具有这样方法的对象。您现在可以将一堆 TTT 实例传递给 mapsubmit .

或者,您可以将初始化吸收到任务函数中:

def ttt(lines, ip, port):
    sock = initsock(ip, port)
    sts = cts = 0
    ...

现在您可以使用正确的参数列表调用 submit 或使用每个参数的可迭代值调用 map

对于本示例,我更喜欢前一种方法,因为它在执行程序外部打开端口。执行器任务中的错误报告有时可能很棘手,我更愿意使打开端口的容易出错的操作尽可能透明。

编辑

根据您的相关问题,我相信您要问的真正问题是关于函数局部变量(也自动是线程局部变量),而不是在同一线程上的函数调用之间共享。但是,您始终可以在函数调用之间传递引用。

关于python - 如何将线程局部变量与 ThreadPoolExecutor 一起使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45895251/

相关文章:

java - 如果我们直接调用 run 方法会发生什么?

java - ThreadPoolExecutor 创建的线程在关闭后仍保持运行

python eval ('import foo' )引发 SyntaxError

python - 在python中获取棋盘图案中的像素值

python - 如何从 Django 模块中的给定字段中获取引用为 ForeignKey 的所有对象

java - 如何配置ThreadPoolExecutor来限制线程数

java - Executor框架中所有线程都执行完毕时捕获一个事件

python - 使用seaborn.objects接口(interface)绘制回归置信区间(v0.12)

c++ - BOOST::thread 删除可连接线程有什么问题?

java - 使用 Spring 在 Tomcat 启动期间委托(delegate)处理