Python队列链接对象运行异步协程与主线程输入

标签 python multithreading thread-safety python-asyncio

我有一个运行的脚本,其中主线程从 stdin 获取输入,然后使用队列将其传递给子线程。在子线程中,我使用 asyncio 协同程序在套接字上启动监听器并等待连接。建立连接后,我现在可以从主线程通过监听器发送数据。

一切似乎都运行良好,但由于 asyncio.BaseEventLoop 不是线程安全的,我会遇到问题吗?

这是我尝试解决使用阻塞库(如 python 的 cmd 模块和 asyncio)的问题。

我的代码如下。

import sys
import asyncio
from time import sleep
from threading import Thread
from queue import Queue

stdin_q = Queue()

clients = {} # task -> (reader, writer)

def client_connected_handler(client_reader, client_writer):
    # Start a new asyncio.Task to handle this specific client connection
    task = asyncio.Task(handle_client(client_reader, client_writer))
    clients[task] = (client_reader, client_writer)

    def client_done(task):
        # When the tasks that handles the specific client connection is done
        del clients[task]

    # Add the client_done callback to be run when the future becomes done
    task.add_done_callback(client_done)

@asyncio.coroutine
def handle_client(client_reader, client_writer):
    # Handle the requests for a specific client with a line oriented protocol
    while True:

        cmd = yield from get_input()
        client_writer.write(cmd.encode())

        data = yield from client_reader.read(1024)

        print(data.decode(),end="",flush=True)

@asyncio.coroutine
def get_input():
  while True:
    try:
      return stdin_q.get()
    except:
      pass



class Control:

    def start(self):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        self.loop = asyncio.get_event_loop()

        server = self.loop.run_until_complete(asyncio.start_server(client_connected_handler, '0.0.0.0', 2222))
        self.loop.run_forever()
        self.stop()

    def stop(self):    
        self.loop.stop()
        self.loop.close()

def fire_control():
    con = Control()
    con.start()

if __name__ == "__main__":

    stdin_q.put("\n")
    t = Thread(target=fire_control)
    t.start()
    sleep(2)
    _cmd = ""
    while _cmd.lower() != "exit":
        _cmd = input("")
        if _cmd == "":
          _cmd = "\r\n"

        stdin_q.put(_cmd)      

最佳答案

这不会正常工作,因为对 stdin_q.get() 的调用会阻塞您的事件循环。这意味着如果您的服务器有多个客户端,则所有客户端都将被第一个到达 stdin_q.get() 的客户端完全阻塞,直到您将数据发送到队列中。解决这个问题的最简单方法是使用 BaseEvent.loop.run_in_executor在后台 ThreadPoolExecutor 中运行 stdin_q.get,这样您就可以在不阻塞事件循环的情况下等待它:

@asyncio.coroutine
def get_input():
    loop = asyncio.get_event_loop()
    return (yield from loop.run_in_executor(None, stdin_q.get))  # None == use default executor.

编辑 (1/27/16):

有一个图书馆叫janus ,它提供了一个异步友好、线程安全的队列实现。

使用该库,您的代码将如下所示(我省略了未更改的部分):

...
import janus

loop = asyncio.new_event_loop()
stdin_q = janus.Queue(loop=loop)
...

@asyncio.coroutine
def get_input():
  loop = asyncio.get_event_loop()
  return (yield from stdin_q.async_q.get())

class Control:

    def start(self):
        asyncio.set_event_loop(loop)
        self.loop = asyncio.get_event_loop()

        server = self.loop.run_until_complete(asyncio.start_server(client_connected_handler, '0.0.0.0', 2222))
        self.loop.run_forever()
        self.stop()

    def stop(self):    
        self.loop.stop()
        self.loop.close()

...

if __name__ == "__main__":

    stdin_q.sync_q.put("\n")
    t = Thread(target=runner)
    t.start()
    sleep(2)
    _cmd = ""
    while _cmd.lower() != "exit":
        _cmd = input("")
        if _cmd == "":
          _cmd = "\r\n"

        stdin_q.sync_q.put(_cmd)

关于Python队列链接对象运行异步协程与主线程输入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29324610/

相关文章:

Java ThreadLocal 静态?

c# - 在 Task.Run C# 中显示函数的运行时间

java - 即使在 list 中声明 Activity 也未找到异常

java - 在Android中什么时候应该使用锁,什么时候应该使用synchronized?有区别吗?

java - 艾略特波浪计算器,图表模式识别

python - 如何在 python 中删除行 CSV

java - 如何杀死等待Java中阻塞函数调用的线程?

java - 通过不同线程同时读取和更改变量

python - 将 Django 开发数据库从默认的 SQLite 更改为 PostgreSQL

python - 在python中生成具有三个类的3个圆圈数据集