我有一个看起来像这样的架构:
- 一旦消息被发送到 SQS 队列,ECS 任务就会选择该消息并对其进行处理。
- 这意味着如果 X 条消息被发送到队列中,X 个 ECS 任务将并行启动。 ECS 任务只能获取一条消息(根据我上面的代码)
ECS 任务使用dockerized
Python 容器,并使用boto3
SQS client检索和解析 SQS 消息:
sqs_response = get_sqs_task_data('<sqs_queue_url>')
sqs_message = parse_sqs_message(sqs_response)
while sqs_message is not None:
# Process it
# Delete if from the queue
# Get next message in queue
sqs_response = get_sqs_task_data('<sqs_queue_url>')
sqs_message = parse_sqs_message(sqs_response)
def get_sqs_task_data(queue_url):
client = boto3.client('sqs')
response = client.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1
)
return response
def parse_sqs_message(response_sqs_message):
if 'Messages' not in response_sqs_message:
logging.info('No messages found in queue')
return None
# ... parse it and return a dict
return {
data_1 = ...,
data_2 = ...
}
总而言之,非常简单。
在 get_sqs_data()
中,我明确指定我只想检索一条消息(因为 1 个 ECS 任务只需要处理一条消息)。
在 parse_sqs_message()
中,我测试队列中是否还有一些消息
if 'Messages' not in response_sqs_message:
logging.info('No messages found in queue')
return None
当队列中只有一条消息时(即触发了一个 ECS 任务),一切正常。 ECS 任务能够选择消息、处理它并删除它。
但是,当队列中有 X 个消息(X > 1
)同时时,X 个 ECS 任务会被触发,但只有 ECS 任务能够获取其中一条消息并对其进行处理。
所有其他 ECS 任务都将退出并显示 No messages found in queue
,尽管还有 X - 1
消息需要处理。
这是为什么呢?为什么 others 任务无法选择剩下的消息?
如果重要的话,SQS 的 VisibilityTimeout
设置为 30 分钟。
如有任何帮助,我们将不胜感激!
如果需要,请随时要求更精确。
最佳答案
我忘了回答那个问题。
问题在于 SQS 被设置为 FIFO 队列。 FIFO 队列一次只允许一个消费者(以保持消息的顺序)。将其更改为普通(标准)队列可解决此问题。
关于python - ECS 任务只能从 SQS 队列中挑选一条消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54530667/