Python Asyncio 阻塞协程

标签 python asynchronous zeromq publish-subscribe python-asyncio

我正在尝试编写一个基于 Asyncio 和使用 ZeroMQ 实现的发布/订阅设计模式的简单程序。发布者有 2 个协程;一个监听传入的订阅,另一个将值(通过 HTTP 请求获得)发布给订阅者。订阅者订阅特定参数(在本例中为城市名称),并等待值(该城市的温度)。

这是我的代码:

publisher.py

#!/usr/bin/env python

import json
import aiohttp
import aiozmq
import asyncio
import zmq


class Publisher:
    BIND_ADDRESS = 'tcp://*:10000'

    def __init__(self):
        self.stream = None
        self.parameter = ""

    @asyncio.coroutine
    def main(self):
        self.stream = yield from aiozmq.create_zmq_stream(zmq.XPUB, bind=Publisher.BIND_ADDRESS)
        tasks = [
            asyncio.async(self.subscriptions()),
            asyncio.async(self.publish())]
        print("before wait")
        yield from asyncio.wait(tasks)
        print("after wait")

    @asyncio.coroutine
    def subscriptions(self):
        print("Entered subscriptions coroutine")
        while True:
            print("New iteration of subscriptions loop")
            received = yield from self.stream.read()
            first_byte = received[0][0]
            self.parameter = received[0][-len(received[0])+1:].decode("utf-8")
            # Subscribe request
            if first_byte == 1:
                print("subscription request received for parameter "+self.parameter)
            # Unsubscribe request
            elif first_byte == 0:
                print("Unsubscription request received for parameter "+self.parameter)


    @asyncio.coroutine
    def publish(self):
        print("Entered publish coroutine")
        while True:
            if self.parameter:
                print("New iteration of publish loop")

                # Make HTTP request
                url = "http://api.openweathermap.org/data/2.5/weather?q="+self.parameter
                response = yield from aiohttp.request('GET', url)
                assert response.status == 200
                content = yield from response.read()

                # Decode JSON string
                decoded_json = json.loads(content.decode())

                # Get parameter value
                value = decoded_json["main"]["temp"]

                # Publish fetched values to subscribers
                message = bytearray(self.parameter+":"+str(value),"utf-8")
                print(message)
                pack = [message]

                print("before write")
                yield from self.stream.write(pack)
                print("after write")

            yield from asyncio.sleep(10)

test = Publisher()
loop = asyncio.get_event_loop()
loop.run_until_complete(test.main())

subscriber.py

#!/usr/bin/env python

import zmq

class Subscriber:
    XSUB_CONNECT = 'tcp://localhost:10000'

    def __init__(self):
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.XSUB)
        self.socket.connect(Subscriber.XSUB_CONNECT)

    def loop(self):
        print(self.socket.recv())
        self.socket.close()

    def subscribe(self, parameter):
        self.socket.send_string('\x01'+parameter)
        print("Subscribed to parameter "+parameter)

    def unsubscribe(self, parameter):
        self.socket.send_string('\x00'+parameter)
        print("Unsubscribed to parameter "+parameter)

test = Subscriber()
test.subscribe("London")
while True:
    print(test.socket.recv())

这是输出:

订阅方:

$ python3 subscriber.py 
    Subscribed to parameter London
    b'London:288.15'

发布方:

$ python3 publisher.py 
    before wait
    Entered subscriptions coroutine
    New iteration of subscriptions loop
    Entered publish coroutine
    subscription request received for parameter London
    New iteration of subscriptions loop
    New iteration of publish loop
    bytearray(b'London:288.15')
    before write

程序卡在那里。

如您所见,"before write" 出现在输出中并且消息被发送,但是 "after write" 没有出现。因此,我认为可能在 self.stream.write(pack) 调用堆栈中的某处引发并捕获了异常。

如果我向发布者发送一个KeyboardInterrupt,我得到的是:

Traceback (most recent call last):
  File "publisher.py", line 73, in <module>
    loop.run_until_complete(test.main())
  File "/usr/lib/python3.4/asyncio/base_events.py", line 304, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.4/asyncio/base_events.py", line 276, in run_forever
    self._run_once()
  File "/usr/lib/python3.4/asyncio/base_events.py", line 1136, in _run_once
    event_list = self._selector.select(timeout)
  File "/usr/lib/python3.4/selectors.py", line 432, in select
    fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt
Task exception was never retrieved
future: <Task finished coro=<publish() done, defined at publisher.py:43> exception=TypeError("'NoneType' object is not iterable",)>
Traceback (most recent call last):
  File "/usr/lib/python3.4/asyncio/tasks.py", line 236, in _step
    result = coro.send(value)
  File "publisher.py", line 66, in publish
    yield from self.stream.write(pack)
TypeError: 'NoneType' object is not iterable
Task was destroyed but it is pending!
task: <Task pending coro=<subscriptions() running at publisher.py:32> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:399]>

所以我想我的问题实际上是这个错误:TypeError: 'NoneType' object is not iterable,但我不知道是什么原因造成的。

这里出了什么问题?

最佳答案

问题是您试图yield from调用self.stream.write(),但是stream.write isn't actually a coroutine .当您对项目调用 yield from 时,Python 会在内部调用 iter(item)。在这种情况下,对 write() 的调用返回 None,因此 Python 正在尝试执行 iter(None) - 因此出现异常看。

要修复它,您应该像普通函数一样调用write()。如果你真的想等到 write 被刷新并发送给读者,使用 yield from stream.drain()在调用 write() 之后:

print("before write")
self.stream.write(pack)
yield from self.stream.drain()
print("after write")

此外,要确保在不需要 Ctrl+C 的情况下在 publish 中引发异常,请使用 asyncio.gather 而不是 asyncio.wait:

    yield from asyncio.gather(*tasks)

使用 asyncio.gathertasks 中的任务抛出的任何异常都将被重新引发。

关于Python Asyncio 阻塞协程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31050056/

相关文章:

python - 将用户输入直接送入 int 数组 PYTHON

zeromq - ZeroMQ消息大小长度限制?

python - pyzmq: ROUTER 套接字是否有类似 "sendto"的函数的快捷方式?

swift - 如何在swift的后台进程子函数中将值返回给函数

message-queue - ZeroMQ 消息队列

反转嵌套字典的 Pythonic 方式

python - 如何动态创建与另一个函数具有相同签名的函数?

python - 连接/处理脚本到 PySimpleGUI 按钮

vb.net - 异步 ProcessStartInfo : Run cmd program to show in textbox just like cmd window in real time

javascript - Angular 异步数据