上下文: 我正在设计一个应用程序,它将使用来自各种 Amazon SQS 队列的消息。 (超过25个队列) 为此,我正在考虑创建一个库来消费来自队列的消息(称之为 MessageConsumer)
我希望根据队列中的流量动态分配线程来接收/处理来自不同队列的消息,以最大程度地减少资源浪费。 我有两种方法可以解决这个问题。
1) 只能有一种类型的线程来轮询队列、接收消息并处理这些消息,并且所有队列都有一个公共(public)线程池。 2) 可以有单独的轮询和工作线程。
在第二种情况下,我将拥有公共(public)工作线程池和每个队列的恒定数量的轮询器。
编辑: 详细说明第二种情况: 我计划为每个队列设置 1 个连续运行的线程,以获取该队列中的消息量。然后有一些逻辑根据每个队列中的消息数和队列的优先级来决定每个队列所需的轮询线程数。
我不希望轮询线程一直运行,因为这可能会导致空接收(sqs.receiveMessages()),所以我将根据流量分配轮询线程。 高流量队列将有更多的轮询线程,因此更多的作业被提交到工作线程池。
请对此设计提出任何改进或缺陷?
最佳答案
推荐的流程是:
- 工作人员使用长轮询来轮询队列(这意味着它将等待最多 20 秒,然后返回空响应)
- 每次调用
ReceiveMessage()
时,他们可以请求最多 10 条消息 - 工作人员处理消息
- 工作人员从队列中删除消息
- 重复
如果您希望扩大工作线程数量,可以基于 Amazon CloudWatch 中的 ApproximateNumberOfMessagesVisible
指标。如果数字过高,请添加一名工作人员。如果它下降到零(或低于某个阈值),删除 worker 。
让每个工作人员只轮询一个队列可能是最简单的。
不需要“轮询器”。 worker 们自己进行投票。通过这种方式,您可以独立扩展工作人员,而不需要一些中央“轮询”服务来尝试管理这一切。只需启动一个新的 Amazon EC2 实例,启动一些工作线程,它们就会开始处理消息。当扩展时,只需终止工作人员甚至实例 - 再次,无需使用中央“轮询”服务注册/取消注册工作人员。
关于java - 为什么要使用单独的轮询线程和工作线程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61201916/