java - 连续监听 AWS SQS 消息的模式

标签 java amazon-web-services sdk amazon-sqs

我有一个名为 QueueService 的简单类,其中一些方法包装了适用于 Java 的 AWS SQS 开发工具包中的方法。例如:

public ArrayList<Hashtable<String, String>> receiveMessages(String queueURL) {
        List<Message> messages = this.sqsClient.receiveMessage(queueURL).getMessages();

        ArrayList<Hashtable<String, String>> resultList = new ArrayList<Hashtable<String, String>>();
        for(Message message : messages) {
            Hashtable<String, String> resultItem = new Hashtable<String, String>();
            resultItem.put("MessageId", message.getMessageId());
            resultItem.put("ReceiptHandle", message.getReceiptHandle());
            resultItem.put("Body", message.getBody());
            resultList.add(resultItem);
        }
        return resultList;
    }

我还有另一个名为 App 的类,它有一个 main 并创建了一个 QueueService 实例。

我正在寻找一种“模式”来使 App 中的 main 监听队列中的新消息。现在我有一个 while(true) 循环,我在其中调用 receiveMessages 方法:

while(true) {
            messages = queueService.receiveMessages(queueURL); 
            for(Hashtable<String, String> message: messages) {
                String receiptHandle = message.get("ReceiptHandle");
                String messageBody = message.get("MessageBody");
                System.out.println(messageBody);
                queueService.deleteMessage(queueURL, receiptHandle);
            }
        }

这是正确的方法吗?我应该使用 SQS SDK 中的异步消息接收方法吗?

最佳答案

据我所知,Amazon SQS 无法支持主动监听器模型,在该模型中,Amazon SQS 会将消息“推送”给您的监听器,或者在有消息时调用您的消息监听器。

因此,您总是需要轮询消息。轮询支持两种轮询机制 - 短轮询和长轮询。每个都有自己的优点和缺点,但在大多数情况下,您通常最终会使用长轮询,尽管默认的轮询是短轮询。长轮询机制在网络流量方面绝对更有效,更具成本效益(因为亚马逊根据请求的数量向您收费),并且当您希望以时间敏感的方式处理消息时,它也是首选机制( ~= 尽快处理)。

围绕长轮询和短轮询还有更多值得了解的错综复杂之处,在这里解释所有这些有点困难,但如果您愿意,可以通过以下博客阅读更多关于此的详细信息。它还有一些代码示例,应该会有帮助。

http://pragmaticnotes.com/2017/11/20/amazon-sqs-long-polling-versus-short-polling/

就 while(true) 循环而言,我会说这取决于。 如果您正在使用长轮询,并且可以将等待时间设置为(最大)20 秒,这样如果没有消息,您轮询 SQS 的频率不会超过 20 秒。如果有消息,您可以决定是频繁轮询(消息一到达就处理)还是总是按时间间隔(比如每 n 秒)处理它们。

另一点需要注意的是,您可以在单个 receiveMessages 请求中读取多达 10 条消息,因此这也将减少您对 SQS 的调用次数,从而降低成本。正如上面的博客详细解释的那样,你可能会请求阅读 10 条消息,但即使队列中有那么多消息,它也可能不会返回 10。

但总的来说,如果您希望在运行时关闭轮询,我会说您需要构建适当的 Hook 和异常处理,以防您使用 while(true) 类型的结构。

要考虑的另一个方面是您是想在主应用程序线程中轮询 SQS,还是想生成另一个线程。因此,另一种选择是在主线程中创建一个带有单个线程的 ScheduledThreadPoolExecutor,以安排一个线程定期(每隔几秒)轮询 SQS,并且您可能不需要 while(true) 结构。

关于java - 连续监听 AWS SQS 消息的模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47911875/

相关文章:

java - 'opentracing: spring: web: ignoreAutoConfiguredSkipPatterns: true' 是做什么的?

linux - 如何在 AWS p2.xlarge 实例、AMI ami-edb11e8d 和最新的 nvidia 驱动程序 (375.39) 中的最新版本的 Tensorflow (1.0) 中安装 CUDA 8.0

windows-phone-8 - Visual Studio 2013 Professional 的 Sharepoint Phone SDK 安装

java - 在 Android 上获取 Cognito 凭证

amazon-web-services - AWS ECS 中的批处理系统与 AWS Batch 有何不同?

ios - 使用 Google map iOS sdk 在 swift 中创建自定义信息窗口?

android - 如何检测用户是否启用了密码或 PIN?

java - 数据库中存储密码的加密方法

java - 从 Python 调用 Java 代码的最佳方式是什么?

java - JPA @Embedded 注释是强制性的吗?