python - 在线程内调用函数

标签 python multithreading

我正在用 Python 创建一个简单的 TCP 服务器-客户端脚本。服务器是线程化的,并为每个客户端连接创建一个新的工作线程/线程。到目前为止,我几乎已经编写了整个服务器模块的代码。但是我的函数名为 handle_clients() ,它为每个传入的客户端连接 fork ,变得非常长。为了提高代码的可读性,我想将 handle_clients() 拆分为多个小函数。我确实明白,当我将handle_client()拆分为更小的函数时,拆分函数应该包裹在互斥锁周围,以同步多个handle_clients()<之间的共享使用 线程。这样做实际上会降低程序的效率,因为handle_clients()必须等待其他线程解锁共享函数才能真正使用它。我的另一个想法是在 handle_clients() 线程中创建这些较小的函数作为线程。并等待这些线程使用 Thread.join() 完成,然后再继续。有更好的方法吗?

我的代码:

#!/usr/bin/python
import socket
import threading
import pandas as pd

class TCPServer(object):
    NUMBER_OF_THREADS = 0
    BUFFER = 4096
    threads_list = []

    def __init__(self, port, hostname):
        self.socket = socket.socket(
            family=socket.AF_INET, type=socket.SOCK_STREAM)
        self.socket.bind((hostname, port))

    def listen_for_clients(self):
        self.socket.listen(5)
        while True:
            client, address = self.socket.accept()
            client_ID = client.recv(TCPServer.BUFFER)
            print(f'Connected to client: {client_ID}')

            if client_ID:
                TCPServer.NUMBER_OF_THREADS = TCPServer.NUMBER_OF_THREADS + 1
                thread = threading.Thread(
                    target=TCPServer.create_worker, args=(self, client, address, client_ID))
                TCPServer.threads_list.append(thread)
                thread.start()

            if TCPServer.NUMBER_OF_THREADS > 2:
                break

        TCPServer.wait_for_workers()

    def wait_for_workers():
        for thread in TCPServer.threads_list:
            thread.join()


    def create_worker(self, client, address, client_ID):
        print(f'Spawned a new worker for {client_ID}. Worker #: {TCPServer.NUMBER_OF_THREADS}')
        data_list = []
        data_frame = pd.DataFrame()
        client.send("SEND_REQUEST_TYPE".encode())
        request_type = client.recv(TCPServer.BUFFER).decode('utf-8')

        if request_type == 'KMEANS':
            print(f'Client: REQUEST_TYPE {request_type}')
            client.send("SEND_DATA".encode())

            while True:
                data = client.recv(TCPServer.BUFFER).decode('utf-8')
                if data == 'ROW':
                    client.send("OK".encode())
                    while True:
                        data = client.recv(TCPServer.BUFFER).decode('utf-8')
                        print(f'Client: {data}')
                        if data == 'ROW_END':
                            print('Data received: ', data_list)
                            series = pd.Series(data_list)
                            data_frame.append(series, ignore_index=True)
                            data_list = []
                            client.send("OK".encode())
                            break
                        else:
                            data_list.append(int(data))
                            client.send("OK".encode())

                elif data == 'DATA_END':
                    client.send("WAIT".encode())


            # (Vino) pass data to algorithm
            print('Data received from client {client_ID}: ', data_frame)

        elif request_type == 'NEURALNET':
            pass
        elif request_type == 'LINRIGRESSION':
            pass
        elif request_type == 'LOGRIGRESSION':
            pass



def main():
    port = input("Port: ")
    server = TCPServer(port=int(port), hostname='localhost')
    server.listen_for_clients()

if __name__ == '__main__':
    main()

注意:以下代码块是重复的,将在 handle_client() 函数中多次使用。

while True:
    data = client.recv(TCPServer.BUFFER).decode('utf-8')
    if data == 'ROW':
        client.send("OK".encode())
        while True:
            data = client.recv(TCPServer.BUFFER).decode('utf-8')
            print(f'Client: {data}')
            if data == 'ROW_END':
                print('Data received: ', data_list)
                series = pd.Series(data_list)
                data_frame.append(series, ignore_index=True)
                data_list = []
                client.send("OK".encode())
                break
            else:
                data_list.append(int(data))
                client.send("OK".encode())

    elif data == 'DATA_END':
        client.send("WAIT".encode())


# (Vino) pass data to algorithm
print('Data received from client {client_ID}: ', data_frame)

这是我想要放在单独函数中的 block ,并在 handle_client() 线程中调用它。

最佳答案

您的代码已经很长了,我不会深入探讨它,但会尽量保持通用性。

I do understand that when I split handle_client() into smaller functions, the split functions should be wrapped around mutex locks.

这并不是直接正确的,在线程之间,无论您的函数调用如何,您都必须使用锁来防止内存覆盖。

The server is threaded

看起来你正在做CPU密集型工作(我看到LINALGNEURALNET,...),在Python中使用线程是不合逻辑的,分派(dispatch) CPU 密集型负载,因为 GIL 将使线程之间的 CPU 使用量线性化。

在 Python 中并行化 CPU 密集型工作的方法是使用进程。

进程不共享内存,因此您可以在没有互斥体的情况下自由操作变量,但它们根本不会共享,我希望您的工作是独立的,因为它们不能共享任何状态。

如果你需要共享状态,避免锁,处理起来很复杂,这是死锁的方式,而且不可读,尝试用队列来实现你的“状态共享”,作为作业的管道,每个 worker 拉从队列中执行工作,然后推送到另一个队列,这样可以使事情保持清晰且易于理解。另外,还有线程和进程队列的实现,因此您几乎可以无缝地在两者之间切换。

if TCPServer.NUMBER_OF_THREADS > 2: break

嘿,当你有两个以上的线程、现有的主进程、杀死你的服务器时,你就会脱离主循环,我敢打赌,这就是你想要的。哦,如果您使用进程而不是线程,您应该预 fork 它们的池,因为它们的创建成本比线程更高。并重用它们,一个进程可以在完成一项工作后执行一项工作,它不必终止(通常使用 queues 将工作发送到您的进程)。

旁注:我会使用 HTTP 而不是原始 TCP 来实现此功能,以受益于请求、响应、错误报告、现有框架的概念以及使用现有客户端的能力(命令行中的 curl/wget、浏览器) ,Python 中的请求)。我将完全异步地实现这一点(不阻塞 HTTP 请求),例如创建作业的一个请求,以及以下请求以获取状态和结果,例如:

$ curl -X POST http://localhost/linalg/jobs/ -d '{your data}'
201 Created
Location: http://localhost/linalg/jobs/1

$ curl -XGET http://localhost/linalg/jobs/1
200 OK
{"status": "queued"}

过了一段时间...

$ curl -XGET http://localhost/linalg/jobs/1
200 OK
{"status": "in progress"}

过了一段时间...

$ curl -XGET http://localhost/linalg/jobs/1
200 OK
{"status": "done", "result": "..."}

为了实现这一点,已经完成了很多出色的工作,通常是 aiohttp , apistar ,等等。

关于python - 在线程内调用函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47119723/

相关文章:

python - 使用 collections.Iterable 在 python 中展平不规则列表时,列表不会作为最终对象返回

ios - Realm 线程困惑

ios - 我们可以允许从两个线程同时操作 NSMutableArray 的不同 "indexes"

android - 在android中,如果我在线程中运行网络请求但不想在响应返回之前刷新UI怎么办?

python - Python 实例变量是线程安全的吗?

c++ - 如何多次使用 uv_queue_work?

python - 如何使用 selenium、chrome 驱动程序和 python 关闭新建的选项卡

python - 尝试从 django 中的数据库显示图像时图像损坏

python - 如何使用 anchor 标记在 Tornado 中从一个 html 导航到另一个

Python:修改异常/错误输出