我正在尝试创建一个 AWS SQS Windows 服务使用者,它将以 10 条为一组轮询消息。每条消息都将在其自己的任务中执行,以实现并行执行。消息处理包括调用不同的 api 和发送电子邮件,因此可能需要一些时间。
我的问题是,首先,我只想在可以立即处理 10 条消息时轮询队列。这是由于 sqs 可见性超时,并且接收到的消息“等待”可能会超过可见性超时并“返回”队列。这会产生重复。我不认为调整可见性超时是好的,因为消息仍然有可能被重复,而这正是我试图避免的。其次,我希望对并行性有某种限制(例如,最大限制为 100 个并发任务),这样服务器资源就可以不受限制,因为服务器中还运行着其他应用程序。
如何实现这一目标?或者有其他方法可以解决这些问题吗?
最佳答案
这个答案做出以下假设:
- 从 AWS 获取消息应进行序列化。仅应并行处理消息。
- 应处理从 AWS 获取的每条消息。在所有获取的消息有机会被处理之前,整个执行不应终止。
- 每个消息处理操作都应该等待。在所有启动的任务完成之前,整个执行不应终止。
- 在处理消息期间发生的任何错误都应被忽略。整个执行不应因为单个消息的处理失败而终止。
- 从 AWS 获取消息期间发生的任何错误都应该是致命的。整个执行应该终止,但不会在所有当前运行的消息处理操作完成之前终止。
- 执行机制应该能够处理从 AWS 获取操作返回的批处理消息数量与请求数量不同的情况。
下面是一个(希望)满足这些要求的实现:
/// <summary>
/// Starts an execution loop that fetches batches of messages sequentially,
/// and process them one by one in parallel.
/// </summary>
public static async Task ExecutionLoopAsync<TMessage>(
Func<int, Task<TMessage[]>> fetchMessagesAsync,
Func<TMessage, Task> processMessageAsync,
int fetchCount,
int maxDegreeOfParallelism,
CancellationToken cancellationToken = default)
{
// Arguments validation omitted
var semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
// Count how many times we have acquired the semaphore, so that we know
// how many more times we have to acquire it before we exit from this method.
int acquiredCount = 0;
try
{
while (true)
{
Debug.Assert(acquiredCount == 0);
for (int i = 0; i < fetchCount; i++)
{
await semaphore.WaitAsync(cancellationToken);
acquiredCount++;
}
TMessage[] messages = await fetchMessagesAsync(fetchCount)
?? Array.Empty<TMessage>();
for (int i = 0; i < messages.Length; i++)
{
if (i >= fetchCount) // We got more messages than we asked for
{
await semaphore.WaitAsync();
acquiredCount++;
}
ProcessAndRelease(messages[i]);
acquiredCount--;
}
if (messages.Length < fetchCount)
{
// We got less messages than we asked for
semaphore.Release(fetchCount - messages.Length);
acquiredCount -= fetchCount - messages.Length;
}
// This method is 'async void' because it is not expected to throw ever
async void ProcessAndRelease(TMessage message)
{
try { await processMessageAsync(message); }
catch { } // Swallow exceptions
finally { semaphore.Release(); }
}
}
}
catch (SemaphoreFullException)
{
// Guard against the (unlikely) scenario that the counting logic is flawed.
// The counter is no longer reliable, so skip the awaiting in finally.
acquiredCount = maxDegreeOfParallelism;
throw;
}
finally
{
// Wait for all pending operations to complete. This could cause a deadlock
// in case the counter has become out of sync.
for (int i = acquiredCount; i < maxDegreeOfParallelism; i++)
await semaphore.WaitAsync();
}
}
使用示例:
var cts = new CancellationTokenSource();
Task executionTask = ExecutionLoopAsync<Message>(async count =>
{
return await GetBatchFromAwsAsync(count);
}, async message =>
{
await ProcessMessageAsync(message);
}, fetchCount: 10, maxDegreeOfParallelism: 100, cts.Token);
关于c# - Aws Sqs Consumer - 仅在可以立即处理消息时进行轮询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65850128/