python - Python 中的线程同步

标签 python multithreading sockets client

我目前正在开展一个学校项目,其中的任务之一是建立一个线程服务器/客户端系统。系统中的每个客户端在连接到服务器时都应该在服务器上分配自己的线程。此外,我希望服务器运行其他线程,一个涉及来自命令行的输入,另一个涉及向所有客户端广播消息。但是,我无法让它按照我想要的方式运行。看起来线程互相阻塞。我希望我的程序在服务器监听连接的客户端的“同时”从命令行获取输入,等等。

我是 python 编程和多线程的新手,尽管我认为我的想法很好,但我并不惊讶我的代码不起作用。问题是我不太确定如何实现不同线程之间的消息传递。我也不确定如何正确实现资源锁定命令。我将在这里发布我的服务器文件和客户端文件的代码,我希望有人可以帮助我。我认为这实际上应该是两个相对简单的脚本。我尝试在某种程度上尽可能好地评论我的代码。

import select
import socket
import sys
import threading
import client

class Server:

#initializing server socket
def __init__(self, event):
    self.host = 'localhost'
    self.port = 50000
    self.backlog = 5
    self.size = 1024
    self.server = None
    self.server_running = False
    self.listen_threads = []
    self.local_threads = []
    self.clients = []
    self.serverSocketLock = None
    self.cmdLock = None
    #here i have also declared some events for the command line input
    #and the receive function respectively, not sure if correct
    self.cmd_event = event
    self.socket_event = event

def openSocket(self):
    #binding server to port
    try: 
        self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server.bind((self.host, self.port))
        self.server.listen(5)
        print "Listening to port " + str(self.port) + "..."
    except socket.error, (value,message):
        if self.server:
            self.server.close()
        print "Could not open socket: " + message
        sys.exit(1)

def run(self):
    self.openSocket()

    #making Rlocks for the socket and for the command line input

    self.serverSocketLock = threading.RLock()
    self.cmdLock = threading.RLock()

    #set blocking to non-blocking
    self.server.setblocking(0)
    #making two threads always running on the server,
    #one for the command line input, and one for broadcasting (sending)
    cmd_thread = threading.Thread(target=self.server_cmd)
    broadcast_thread = threading.Thread(target=self.broadcast,args=[self.clients])
    cmd_thread.daemon = True
    broadcast_thread.daemon = True
    #append the threads to thread list
    self.local_threads.append(cmd_thread)
    self.local_threads.append(broadcast_thread)

    cmd_thread.start()
    broadcast_thread.start()


    self.server_running = True
    while self.server_running:

        #connecting to "knocking" clients
        try:
            c = client.Client(self.server.accept())
            self.clients.append(c)
            print "Client " + str(c.address) + " connected"

            #making a thread for each clientn and appending it to client list
            listen_thread = threading.Thread(target=self.listenToClient,args=[c])
            self.listen_threads.append(listen_thread)
            listen_thread.daemon = True
            listen_thread.start()
            #setting event "client has connected"
            self.socket_event.set()

        except socket.error, (value, message):
            continue

    #close threads

    self.server.close()
    print "Closing client threads"
    for c in self.listen_threads:
        c.join()

def listenToClient(self, c):

    while self.server_running:

        #the idea here is to wait until the thread gets the message "client
        #has connected"
        self.socket_event.wait()
        #then clear the event immidiately...
        self.socket_event.clear()
        #and aquire the socket resource
        self.serverSocketLock.acquire()

        #the below is the receive thingy

        try:
            recvd_data = c.client.recv(self.size)
            if recvd_data == "" or recvd_data == "close\n":
                print "Client " + str(c.address) + (" disconnected...")
                self.socket_event.clear()
                self.serverSocketLock.release()
                return

            print recvd_data

            #I put these here to avoid locking the resource if no message 
            #has been received
            self.socket_event.clear()
            self.serverSocketLock.release()
        except socket.error, (value, message):
            continue            

def server_cmd(self):

    #this is a simple command line utility
    while self.server_running:

        #got to have a smart way to make this work

        self.cmd_event.wait()
        self.cmd_event.clear()
        self.cmdLock.acquire()


        cmd = sys.stdin.readline()
        if cmd == "":
            continue
        if cmd == "close\n":
            print "Server shutting down..."
            self.server_running = False

        self.cmdLock.release()


def broadcast(self, clients):
    while self.server_running:

        #this function will broadcast a message received from one
        #client, to all other clients, but i guess any thread
        #aspects applied to the above, will work here also

        try:
            send_data = sys.stdin.readline()
            if send_data == "":
                continue
            else:
                for c in clients:
                    c.client.send(send_data)
            self.serverSocketLock.release()
            self.cmdLock.release()
        except socket.error, (value, message):
            continue

if __name__ == "__main__":
e = threading.Event()
s = Server(e)
s.run()

然后是客户端文件

import select
import socket
import sys
import server
import threading

class Client(threading.Thread):

#initializing client socket

def __init__(self,(client,address)):
    threading.Thread.__init__(self) 
    self.client = client 
    self.address = address
    self.size = 1024
    self.client_running = False
    self.running_threads = []
    self.ClientSocketLock = None

def run(self):

    #connect to server
    self.client.connect(('localhost',50000))

    #making a lock for the socket resource
    self.clientSocketLock = threading.Lock()
    self.client.setblocking(0)
    self.client_running = True

    #making two threads, one for receiving messages from server...
    listen = threading.Thread(target=self.listenToServer)

    #...and one for sending messages to server
    speak = threading.Thread(target=self.speakToServer)

    #not actually sure wat daemon means
    listen.daemon = True
    speak.daemon = True

    #appending the threads to the thread-list
    self.running_threads.append(listen)
    self.running_threads.append(speak)
    listen.start()
    speak.start()

    #this while-loop is just for avoiding the script terminating
    while self.client_running:
        dummy = 1

    #closing the threads if the client goes down
    print "Client operating on its own"
    self.client.close()

    #close threads
    for t in self.running_threads:
        t.join()
    return

#defining "listen"-function
def listenToServer(self):
    while self.client_running:

        #here i acquire the socket to this function, but i realize I also
        #should have a message passing wait()-function or something
        #somewhere
        self.clientSocketLock.acquire()

        try:
            data_recvd = self.client.recv(self.size)
            print data_recvd
        except socket.error, (value,message):
            continue

        #releasing the socket resource
        self.clientSocketLock.release()

#defining "speak"-function, doing much the same as for the above function       
def speakToServer(self):
    while self.client_running:
        self.clientSocketLock.acquire()
        try:
            send_data = sys.stdin.readline()
            if send_data == "close\n":
                print "Disconnecting..."
                self.client_running = False
            else:
                self.client.send(send_data)
        except socket.error, (value,message):
            continue

        self.clientSocketLock.release()

if __name__ == "__main__":
c = Client((socket.socket(socket.AF_INET, socket.SOCK_STREAM),'localhost'))
c.run()

我意识到这需要您阅读相当多的代码行,但正如我所说,我认为其中的概念和脚本本身应该非常容易理解。如果有人能帮助我以正确的方式同步我的线程,我将非常感激 =)

提前致谢

---编辑---

好的。因此,我现在将代码简化为仅包含服务器和客户端模块中的发送和接收函数。连接到服务器的客户端有自己的线程,两个模块中的发送和接收函数都在自己单独的线程中进行操作。这就像一个魅力,服务器模块中的广播功能将它从一个客户端获得的字符串回显给所有客户端。到目前为止一切顺利!

我希望我的脚本做的下一件事是在客户端模块中采用特定命令,即“关闭”来关闭客户端,并将所有正在运行的线程加入线程列表中。我使用事件标志来通知listenToServer和主线程speakToServer线程已读取输入“close”。看起来主线程跳出了 while 循环并启动了应该加入其他线程的 for 循环。但它卡在这里。即使在设置事件标志时 server_running 应设置为 False,listenToServer 线程中的 while 循环似乎也永远不会停止。

我在这里仅发布客户端模块,因为我猜想使这两个线程同步的答案也与同步客户端和服务器模块中的更多线程有关。

import select
import socket
import sys
import server_bygg0203
import threading
from time import sleep

class Client(threading.Thread):

#initializing client socket

def __init__(self,(client,address)):

threading.Thread.__init__(self) 
self.client = client 
self.address = address
self.size = 1024
self.client_running = False
self.running_threads = []
self.ClientSocketLock = None
self.disconnected = threading.Event()

def run(self):

#connect to server
self.client.connect(('localhost',50000))

#self.client.setblocking(0)
self.client_running = True

#making two threads, one for receiving messages from server...
listen = threading.Thread(target=self.listenToServer)

#...and one for sending messages to server
speak = threading.Thread(target=self.speakToServer)

#not actually sure what daemon means
listen.daemon = True
speak.daemon = True

#appending the threads to the thread-list
self.running_threads.append((listen,"listen"))
self.running_threads.append((speak, "speak"))
listen.start()
speak.start()

while self.client_running:

    #check if event is set, and if it is
    #set while statement to false

    if self.disconnected.isSet():
        self.client_running = False 

#closing the threads if the client goes down
print "Client operating on its own"
self.client.shutdown(1)
self.client.close()

#close threads

#the script hangs at the for-loop below, and
#refuses to close the listen-thread (and possibly
#also the speak thread, but it never gets that far)

for t in self.running_threads:
    print "Waiting for " + t[1] + " to close..."
    t[0].join()
self.disconnected.clear()
return

#defining "speak"-function      
def speakToServer(self):

#sends strings to server
while self.client_running:
    try:
        send_data = sys.stdin.readline()
        self.client.send(send_data)

        #I want the "close" command
        #to set an event flag, which is being read by all other threads,
        #and, at the same time set the while statement to false

        if send_data == "close\n":
            print "Disconnecting..."
            self.disconnected.set()
            self.client_running = False
    except socket.error, (value,message):
        continue
return

#defining "listen"-function
def listenToServer(self):

#receives strings from server
while self.client_running:

    #check if event is set, and if it is
    #set while statement to false

    if self.disconnected.isSet():
        self.client_running = False
    try:
        data_recvd = self.client.recv(self.size)
        print data_recvd
    except socket.error, (value,message):
        continue
return

if __name__ == "__main__":
c = Client((socket.socket(socket.AF_INET, socket.SOCK_STREAM),'localhost'))
c.run()

稍后,当我启动并运行这个服务器/客户端系统时,我将在我们实验室的一些电梯模型上使用该系统,每个客户端都会接收楼层命令或“向上”和“向下”调用。服务器将运行分配算法并更新最适合所请求订单的客户端上的电梯队列。我意识到还有很长的路要走,但我想应该一次就迈出一步 =)

希望有人有时间研究一下这个问题。提前致谢。

最佳答案

我发现这段代码的最大问题是,您立即要做的事情太多,无法轻松调试您的问题。由于逻辑变得非线性,线程可能变得极其复杂。特别是当您必须担心与锁同步时。

您看到客户端相互阻塞的原因是您在服务器的listenToClient()循环中使用serverSocketLock的方式。老实说,这不完全是您现在代码的问题,但当我开始调试它并将套接字变成阻塞套接字时,它就成了问题。如果您将每个连接放入其自己的线程中并从中读取数据,则没有理由在此处使用全局服务器锁。它们都可以同时从自己的套接字读取数据,这就是线程的目的。

这是我给你的建议:

  1. 摆脱所有不需要的锁和额外线程,从头开始
  2. 让客户端像您一样进行连接,并将它们放入自己的线程中。只需让他们每秒发送数据即可。验证您是否可以让多个客户端连接和发送,并且您的服务器正在循环和接收。一旦您完成了这一部分的工作,您就可以继续进行下一部分。
  3. 现在您已将套接字设置为非阻塞。当数据未准备好时,这会导致它们在循环中旋转得非常快。由于您正在线程化,因此应该将它们设置为阻塞。然后读取器线程将简单地等待数据并立即响应。

当线程访问共享资源时使用锁。显然,您需要在任何时候线程尝试修改服务器属性,例如列表或值。但当他们在自己的私有(private)套接字上工作时则不然。

您用来触发读者的事件在这里似乎没有必要。您已收到客户端,然后启动线程。这样就可以开始了。

简而言之...一次一点点地简化和测试。当它工作时,添加更多。现在有太多线程和锁。

以下是 listenToClient 方法的简化示例:

def listenToClient(self, c):
    while self.server_running:
        try:
            recvd_data = c.client.recv(self.size)
            print "received:", c, recvd_data
            if recvd_data == "" or recvd_data == "close\n":
                print "Client " + str(c.address) + (" disconnected...")
                return

            print recvd_data

        except socket.error, (value, message):
            if value == 35:
                continue 
            else:
                print "Error:", value, message  

关于python - Python 中的线程同步,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9521113/

相关文章:

.Net Thread vs ThreadPool vs SerialPort 通信任务

java - 如何使用不可变对象(immutable对象)解决两个编写器的竞争条件

c# - C#UDPClient未通过外部IP接收数据包

sockets - 使用 Docker 时,已建立的连接不会出现在 netstat 中

python - 如何在Python中返回url路径的子字符串?

python - 当用户运行脚本时,如何在 Python 中写入文本文件?

python - 为什么列出 `my_list+[' foo' ]` faster than ` new_list = list(my_list); python 中的 new_list.append ('foo' )`?

python - 从 csv 文件中加载 url 列表,并针对相同的数据对它们进行逐一解析

Java/JavaFX : Poll a database for single value update,,同时保持 GUI 中的响应能力

java - 从 Swing 按钮 ActionPerformed 访问 DatagramSocket 对象