python - sqs 的异步等待接收消息无法正常工作

标签 python asynchronous boto3 python-asyncio

我对异步编程非常陌生,我有一个 sqs channel ,我必须从中读取消息,但在这之间我必须启动我的网络服务器,并在网络服务器启动后立即执行相同的接收消息任务。

运行下面的程序只会继续读取 sqs 消息,调用永远不会转到网络服务器函数。我怎样才能使以下案例发挥作用:

import boto3
import json
import logging
import asyncio
from aiohttp import web
app = web.Application()

max_queue_messages = 10
AWS_REGION='***'
AWS_KEY='***'
AWS_SECRET='***'

sqs = boto3.client('sqs', region_name=AWS_REGION,
        aws_access_key_id=AWS_KEY,
        aws_secret_access_key=AWS_SECRET)


queue_url = 'queuename'


async def start_queue():
    while True:
        response = sqs.receive_message(
            QueueUrl=queue_url,
            AttributeNames=[
                'SentTimestamp'
            ],
            MaxNumberOfMessages=1,
            MessageAttributeNames=[
                'All'
            ],
            VisibilityTimeout=0,
            WaitTimeSeconds=0
        )
        if 'Messages' in response:
            try:
                message = response['Messages']
                payload_dict = {}
                payload_dict['payload'] = message[0]['Body']
                print("mesage from queue:")
            except Exception as e:
                logging.error(f'[sqs] error no message in queue -> {e}')
        else:
            time.sleep()
    return payload_dict

async def webserver():
    print("Starting web Server")
    web.run_app(app,host= "127.0.0.1",port= 5000)


async def func1():

    await start_queue()
    await webserver()

if __name__ == "__main__":
    asyncio.run(func1())

最佳答案

您正在直接将 poll_queue 作为协程运行。使用ensure_future()执行它,它将把协程包装在一个Task对象中。事件循环只能等待和挂起 Task 对象。

import aiobotocore
import json
import logging
import asyncio
from aiohttp import web
app = web.Application()

max_queue_messages = 10
AWS_REGION=
AWS_KEY=
AWS_SECRET=

queue_url =

async def poll_queue(client):
    while True:
        try:
            # This loop wont spin really fast as there is
            # essentially a sleep in the receieve_message call
            response = await client.receive_message(
                QueueUrl=queue_url,
                WaitTimeSeconds=2,
            )

            if 'Messages' in response:
                for msg in response['Messages']:
                    # print('Got msg "{0}"'.format(msg['Body']))
                    print('got queue message')
            else:
                print('No messages in queue')
        except KeyboardInterrupt:
            break

    print('Finished')
    await client.close()


def func1():
    loop = asyncio.get_event_loop()
    session = aiobotocore.get_session(loop=loop)
    client = session.create_client('sqs', region_name=AWS_REGION,
                               aws_access_key_id=AWS_KEY,
                               aws_secret_access_key=AWS_SECRET)
    asyncio.ensure_future(poll_queue(client))

async def hello(request):
    return web.Response(text="Hello, world")

def func2():
    app.add_routes([web.get('/', hello)])
    print("Starting web Server")
    web.run_app(app, host= "127.0.0.1",port= 5000)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    asyncio.set_event_loop(loop)

    func1()
    func2()

    loop.run_forever()

关于python - sqs 的异步等待接收消息无法正常工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57284659/

相关文章:

ios - 网络运行后获取小区计数

python - 使用 S3 元数据调用 kms.decrypt 时出现 InvalidCiphertextException

aws-lambda - Lambda 无法向 SQS 发送消息

python - Pandas:在PeriodIndex之后查询

python - 在 Pandas 中将频率字符串转换为 DateOffset

python - 使用 pd.merge 映射两个数据帧

python - Amazon Cognito 中使用 boto3 和授权的 DEVICE_PASSWORD_VERIFIER 质询响应

python - 如何在 OperaDriver 中启用内置 VPN?

ios - Swift按顺序执行异步任务

entity-framework - EF Core 中使用注入(inject) DbContext 的并行异步调用的最佳实践是什么?