我对异步编程非常陌生,我有一个 sqs channel ,我必须从中读取消息,但在这之间我必须启动我的网络服务器,并在网络服务器启动后立即执行相同的接收消息任务。
运行下面的程序只会继续读取 sqs 消息,调用永远不会转到网络服务器函数。我怎样才能使以下案例发挥作用:
import boto3
import json
import logging
import asyncio
from aiohttp import web
app = web.Application()
max_queue_messages = 10
AWS_REGION='***'
AWS_KEY='***'
AWS_SECRET='***'
sqs = boto3.client('sqs', region_name=AWS_REGION,
aws_access_key_id=AWS_KEY,
aws_secret_access_key=AWS_SECRET)
queue_url = 'queuename'
async def start_queue():
while True:
response = sqs.receive_message(
QueueUrl=queue_url,
AttributeNames=[
'SentTimestamp'
],
MaxNumberOfMessages=1,
MessageAttributeNames=[
'All'
],
VisibilityTimeout=0,
WaitTimeSeconds=0
)
if 'Messages' in response:
try:
message = response['Messages']
payload_dict = {}
payload_dict['payload'] = message[0]['Body']
print("mesage from queue:")
except Exception as e:
logging.error(f'[sqs] error no message in queue -> {e}')
else:
time.sleep()
return payload_dict
async def webserver():
print("Starting web Server")
web.run_app(app,host= "127.0.0.1",port= 5000)
async def func1():
await start_queue()
await webserver()
if __name__ == "__main__":
asyncio.run(func1())
最佳答案
您正在直接将 poll_queue 作为协程运行。使用ensure_future()执行它,它将把协程包装在一个Task对象中。事件循环只能等待和挂起 Task 对象。
import aiobotocore
import json
import logging
import asyncio
from aiohttp import web
app = web.Application()
max_queue_messages = 10
AWS_REGION=
AWS_KEY=
AWS_SECRET=
queue_url =
async def poll_queue(client):
while True:
try:
# This loop wont spin really fast as there is
# essentially a sleep in the receieve_message call
response = await client.receive_message(
QueueUrl=queue_url,
WaitTimeSeconds=2,
)
if 'Messages' in response:
for msg in response['Messages']:
# print('Got msg "{0}"'.format(msg['Body']))
print('got queue message')
else:
print('No messages in queue')
except KeyboardInterrupt:
break
print('Finished')
await client.close()
def func1():
loop = asyncio.get_event_loop()
session = aiobotocore.get_session(loop=loop)
client = session.create_client('sqs', region_name=AWS_REGION,
aws_access_key_id=AWS_KEY,
aws_secret_access_key=AWS_SECRET)
asyncio.ensure_future(poll_queue(client))
async def hello(request):
return web.Response(text="Hello, world")
def func2():
app.add_routes([web.get('/', hello)])
print("Starting web Server")
web.run_app(app, host= "127.0.0.1",port= 5000)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
asyncio.set_event_loop(loop)
func1()
func2()
loop.run_forever()
关于python - sqs 的异步等待接收消息无法正常工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57284659/