Python多线程服务器一次可以处理一个客户端消息

标签 python multithreading

我正在尝试了解多线程,所以我编写了以下客户端/服务器应用程序,其中服务器向客户端发送命令,客户端检查该命令是否等于“a”,然后向服务器发送回复.

在服务器代码中我创建了两个套接字和一个线程;第一个套接字将命令发送(发布)到所有连接(订阅)的客户端。在线程中,第二个套接字等待来自客户端的任何回复,但由于该线程执行一些阻塞操作(例如,将客户端发送的信息存储在数据库中),因此它一次只能处理一个客户端,即使套接字(req-rep socket)可以同时接收多条消息。

server.py

import zmq
import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG)


class Server(object):
    def __init__(self):
        self.context = zmq.Context()
        self.pub_port = 7777
        self.rep_port = 7778

        self.pub_socket = None
        self.rep_socket = None
        self.interface = "*"

    def bind_ports(self):
        logging.debug("[bind_ports] binding the ports....")
        self.pub_socket = self.context.socket(zmq.PUB)
        pub_bind_str = "tcp://{}:{}".format(self.interface, self.pub_port)
        self.pub_socket.bind(pub_bind_str)

        self.rep_socket = self.context.socket(zmq.REP)
        rep_bind_str = "tcp://{}:{}".format(self.interface, self.rep_port)
        self.rep_socket.bind(rep_bind_str)

    def received_info(self):
        while True:
            # logging.debug("[received_flow] ")
            cl_data = self.rep_socket.recv_json()
            logging.info("[received_data] data <{}>".format(flow))
            self.rep_socket.send(b"\x00")
            self.blocking_op(cl_data)

    def blocking_op(self, data):
        time.sleep(1) # simulating some blocking operations e.g. storing info in a database

    def push_instruction(self, cmd):
        logging.debug("[push_inst] Sending the instruction <%s> to the clients...",
        # logging.debug("[push_inst] Sending the instruction <%s> to the agents ...",
         cmd)
        instruction = {"cmd": cmd}
        self.pub_socket.send_json(instruction)

    def create_thread(self):
        thread = threading.Thread(target=self.received_info)
        thread.daemon = True
        thread.start()
        logging.debug("[create_thread] Thread created <{}>".format(
                                                        thread.is_alive()))

    def start_main_loop(self):
        logging.debug("[start_main_loop] Loop started....")
        self.bind_ports()
        self.create_thread()

        while True:
            cmd = input("Enter your command: ")
            self.push_instruction(cmd)

if __name__ == "__main__":
    Server().start_main_loop()

client.py

import zmq
import logging
import random
import time

logging.basicConfig(level=logging.DEBUG)

class Client(object):
    def __init__(self):
        self.context = zmq.Context()
        self.sub_socket = None
        self.req_socket = None

        self.pub_port = 7777
        self.req_port = 7778
        self.server_ip = 'localhost'

        self.client_id = ""

    def connect_to_server(self):
        logging.debug("[conn_to_serv] Connecting to the server ....")
        self.sub_socket = self.context.socket(zmq.SUB)
        self.sub_socket.setsockopt_string(zmq.SUBSCRIBE, "")
        conn_str = "tcp://{}:{}".format(self.server_ip, self.pub_port)
        self.sub_socket.connect(conn_str)

        self.req_socket = self.context.socket(zmq.REQ)
        req_conn_str = "tcp://{}:{}".format(self.server_ip, self.req_port)
        self.req_socket.connect(req_conn_str)

    def get_instruction(self):
        inst = self.sub_socket.recv_json()
        logging.debug("[get_inst] Server sent inst")
        cmd = inst["cmd"]
        return cmd
    def send_flow(self, x, y):
        flow = {
            "client_id": self.client_id,
            "x": x,
            "y": y
        }
        self.req_socket.send_json(flow)

    def start_main_loop(self):
        logging.debug("starting the main loop ....")
        self.client_id = input("What is your id: ")
        self.connect_to_server()

        while True:
            inst = self.get_instruction()
            logging.info("[Main_loop] inst<{}>".format(inst))
            if inst == "a":
                # time.sleep(random.uniform(.6, 1.5))
                self.send_flow("xxx", "yyy")
                self.req_socket.recv()
                logging.debug("[main_loop] server received the flow")

if __name__ == "__main__":
    Client().start_main_loop()

如果有人可以帮助我改进服务器,使其可以同时服务多个客户端的消息,我将不胜感激。

最佳答案

我无法运行您的代码和测试,但如果您的问题是 receive_info() 阻塞,您可以通过启动一个线程来处理实际响应来绕过它。像这样的东西(可能包含拼写错误,我无法用您的代码进行测试 - 例如不知道 flow 是什么。)

def handle_response(self, data):
    logging.info("[received_data] data <{}>".format(flow))
    self.rep_socket.send(b"\x00")
    self.blocking_op(data)

def received_info(self):
        while True:
            # logging.debug("[received_flow] ")
            cl_data = self.rep_socket.recv_json()
            _t = threading.Thread(target=self.handle_response, args=(cl_data,))
            _t.start()

这具有您的 received_info() 循环,但不是在那里进行处理,而是启动一个新线程来处理响应。它需要完成所需的工作,然后线程就会终止,但您的 received_info() 将立即准备好等待新的响应。

关于Python多线程服务器一次可以处理一个客户端消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47866039/

相关文章:

java - Java线程什么时候空闲?

python - Django:存储多个动态排序的模型序列

python - 在 Colaboratory 中从 Google Drive 加载数据时遇到问题

python - 如何使用 NumPy 生成相邻索引

multithreading - "embarrassingly parallel"的反义词是什么?

java - java ThreadPoolExecutor 中的workerCountOf()方法

python - 如何判断字符串是否在字典中

python - 乘法作为基于另一个函数的重复加法?

c++ - 标准 C++11 是否保证 std::async(std::launch::async, func) 在单独的线程中启动 func?

c# - 线程静态懒惰