c# - Aws Sqs Consumer - 仅在可以立即处理消息时进行轮询

标签 c# concurrency task-parallel-library amazon-sqs

我正在尝试创建一个 AWS SQS Windows 服务使用者,它将以 10 条为一组轮询消息。每条消息都将在其自己的任务中执行,以实现并行执行。消息处理包括调用不同的 api 和发送电子邮件,因此可能需要一些时间。

我的问题是,首先,我只想在可以立即处理 10 条消息时轮询队列。这是由于 sqs 可见性超时,并且接收到的消息“等待”可能会超过可见性超时并“返回”队列。这会产生重复。我不认为调整可见性超时是好的,因为消息仍然有可能被重复,而这正是我试图避免的。其次,我希望对并行性有某种限制(例如,最大限制为 100 个并发任务),这样服务器资源就可以不受限制,因为服务器中还运行着其他应用程序。

如何实现这一目标?或者有其他方法可以解决这些问题吗?

最佳答案

这个答案做出以下假设:

  1. 从 AWS 获取消息应进行序列化。仅应并行处理消息。
  2. 应处理从 AWS 获取的每条消息。在所有获取的消息有机会被处理之前,整个执行不应终止。
  3. 每个消息处理操作都应该等待。在所有启动的任务完成之前,整个执行不应终止。
  4. 在处理消息期间发生的任何错误都应被忽略。整个执行不应因为单个消息的处理失败而终止。
  5. 从 AWS 获取消息期间发生的任何错误都应该是致命的。整个执行应该终止,但不会在所有当前运行的消息处理操作完成之前终止。
  6. 执行机制应该能够处理从 AWS 获取操作返回的批处理消息数量与请求数量不同的情况。

下面是一个(希望)满足这些要求的实现:

/// <summary>
/// Starts an execution loop that fetches batches of messages sequentially,
/// and process them one by one in parallel.
/// </summary>
public static async Task ExecutionLoopAsync<TMessage>(
    Func<int, Task<TMessage[]>> fetchMessagesAsync,
    Func<TMessage, Task> processMessageAsync,
    int fetchCount,
    int maxDegreeOfParallelism,
    CancellationToken cancellationToken = default)
{
    // Arguments validation omitted
    var semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);

    // Count how many times we have acquired the semaphore, so that we know
    // how many more times we have to acquire it before we exit from this method.
    int acquiredCount = 0;
    try
    {
        while (true)
        {
            Debug.Assert(acquiredCount == 0);
            for (int i = 0; i < fetchCount; i++)
            {
                await semaphore.WaitAsync(cancellationToken);
                acquiredCount++;
            }

            TMessage[] messages = await fetchMessagesAsync(fetchCount)
                ?? Array.Empty<TMessage>();

            for (int i = 0; i < messages.Length; i++)
            {
                if (i >= fetchCount) // We got more messages than we asked for
                {
                    await semaphore.WaitAsync();
                    acquiredCount++;
                }
                ProcessAndRelease(messages[i]);
                acquiredCount--;
            }

            if (messages.Length < fetchCount)
            {
                // We got less messages than we asked for
                semaphore.Release(fetchCount - messages.Length);
                acquiredCount -= fetchCount - messages.Length;
            }

            // This method is 'async void' because it is not expected to throw ever
            async void ProcessAndRelease(TMessage message)
            {
                try { await processMessageAsync(message); }
                catch { } // Swallow exceptions
                finally { semaphore.Release(); }
            }
        }
    }
    catch (SemaphoreFullException)
    {
        // Guard against the (unlikely) scenario that the counting logic is flawed.
        // The counter is no longer reliable, so skip the awaiting in finally.
        acquiredCount = maxDegreeOfParallelism;
        throw;
    }
    finally
    {
        // Wait for all pending operations to complete. This could cause a deadlock
        // in case the counter has become out of sync.
        for (int i = acquiredCount; i < maxDegreeOfParallelism; i++)
            await semaphore.WaitAsync();
    }
}

使用示例:

var cts = new CancellationTokenSource();

Task executionTask = ExecutionLoopAsync<Message>(async count =>
{
    return await GetBatchFromAwsAsync(count);
}, async message =>
{
    await ProcessMessageAsync(message);
}, fetchCount: 10, maxDegreeOfParallelism: 100, cts.Token);

关于c# - Aws Sqs Consumer - 仅在可以立即处理消息时进行轮询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65850128/

相关文章:

java - 有多少个线程可以同时调用一个对象的非同步方法?

c# - 如何用递归方法实现并行?

c# - 我什么时候应该使用 LongRunning 任务创建选项?

c# - Winrt - 根据出现的位置调整弹出窗口

c# - 无法将结构的 Marshal 指针转换回结构

c# - 在运行时检测接口(interface)的所有具体实现

python - Redis:如何确保购物案例的并发性和原子性?

events - Postgres : Post statement (or insert) asynchronous, 非阻塞处理

c# - 仅使用一小部分 CPU 在多个任务中运行 CPU 密集型方法?

c# - 对于 Int32 错误,值太大或太小