我在我的项目中使用SQS。我想在将消息推送到 SQS 标准队列时提供超时。我所说的超时(不是可见性超时)是指,如果我的消息在 200 毫秒内没有推送到 SQS,那么我会在一段时间后重试推送它。我正在使用 JAVA API 来实现同样的目的。
最初,我尝试通过请求可调用任务将消息推送到 SQS 来自己处理超时,它返回了一个 future 对象,我通过提供超时来等待使用它。但这导致请求被发送到 SQS,在推送消息时,我的超时出现,在收到此已推送消息的响应之前,我的代码认为由于这个 future 对象而超时。
当我查看 SQS 的文档时,我没有找到与调用 SQS 推送、拉取或删除时提供请求超时相关的任何内容。
final Future<ISendMessageResult> future = timeoutHelperThreadPool.getExecutor().submit(() -> {
return getQueueStore().sendMessage(request).get();
});
try {
sendMessageResult = future.get(200, TimeUnit.MILLISECONDS);
logger.info("SQS_PUSH_SUCCESSFUL");
return true;
} catch (final TimeoutException e) {
logger.error("SQS_PUSH_TIMEOUT_EXCEPTION");
}
无论如何,我是否可以确保如果我收到超时,我的消息不会被推送到 SQS,反之亦然?
最佳答案
我建议您要么解决了错误的问题,要么试图解决本质上不可能的问题,这可能是由于不熟悉向 SQS 推送消息时发生的底层操作的复杂性。
所有这些操作都需要非零的时间,并且在某种程度上都是可变的......
SQS 请求转换为 HTTP 请求。
请求已签名。
超过此点后,您将有效地 promise 在没有发生故障的情况下发送消息。
对于每条消息,接下来的三个步骤可能需要也可能不需要发生。这取决于您最近是否已经与服务进行过交互,这可能意味着您有缓存的 DNS 响应,甚至是建立了 TLS 的可重用 TCP 连接,在 HTTP keep-alive 中等待下一个请求,但这是一个通配符。
DNS 查询查找 SQS 端点。
已与端点建立 TCP 连接。
建立 TLS (SSL) session 。
HTTP 请求通过线路发送。
此时,您仍在等待,时钟仍在滴答作响,但您现在无法采取任何措施来阻止请求的处理,并且仍然有很多工作要做。关闭连接可能会导致服务停止处理请求,但没有理由期望它会发生这种情况,因为服务在尝试写入响应之前不太可能看到连接已关闭...在这一点上,它是无关紧要的。
通过线路接收 HTTP 请求。 (此项与前一项分开列出,因为在线传输请求也需要非零时间。)
服务检查请求是否过期。
服务检查请求是否正确签名。
该服务检查签名用户是否有权执行请求的操作。
服务实际上处理请求的操作。
服务生成响应。
响应通过线路发送。
通过线路收到响应。
响应由客户端解析。
与任何网络服务本质上一样,一旦请求到达目的地,事情就脱离了您的控制。
tl;dr:在超时的情况下,不可能实现任何级别的保证消息未发送。
另请参阅the Two Generals' Problem .
然而,这是SQS FIFO队列解决的问题之一。 FIFO 队列支持由您为每条消息分配消息重复数据删除 ID,这允许您在最长 5 分钟的时间内安全重试向 SQS 传送消息。在此时间窗口内,如果您确实将相同的消息发送到同一队列(可能是由于超时引入的歧义),则重复的消息将被服务丢弃,但响应会告诉您传递成功,因为它确实...早些时候。
The message deduplication ID is the token used for deduplication of sent messages. If a message with a particular message deduplication ID is sent successfully, any messages sent with the same message deduplication ID are accepted successfully but aren't delivered during the 5-minute deduplication interval.
关于java - 在将消息推送到 SQS 时提供请求超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56204962/