c# - 如何枚举 IAsyncEnumerable<T> 并为每个元素调用异步操作,从而允许每个迭代/操作对并发?

标签 c# concurrency async-await iasyncenumerable

我有一个IAsyncEnumerable<string>包含从网络下载的数据的流,我想将每条数据异步保存在 SQL 数据库中。所以我用了 ForEachAwaitAsync System.Linq.Async 的扩展方法图书馆。我的问题是下载和保存每条数据是按顺序发生的,而我更希望它同时发生。

澄清一下,我不想同时下载多于一份的数据,也不想同时保存多于一份的数据。我想要的是,当我在数据库中保存一条数据时,应该同时从网络下载下一条数据。

下面是我当前解决方案的最小(人为的)示例。下载五个项目,然后将其保存在数据库中。下载每个项目需要 1 秒,保存又需要 1 秒:

async IAsyncEnumerable<string> GetDataFromWeb()
{
    foreach (var item in Enumerable.Range(1, 5))
    {
        Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Downloading #{item}");
        await Task.Delay(1000); // Simulate an I/O-bound operation
        yield return item.ToString();
    }
}

var stopwatch = Stopwatch.StartNew();
await GetDataFromWeb().ForEachAwaitAsync(async item =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Saving #{item}");
    await Task.Delay(1000); // Simulate an I/O-bound operation
});
Console.WriteLine($"Duration: {stopwatch.ElapsedMilliseconds:#,0} msec");

代码可以工作,但不是我想要的方式。总持续时间约为 10 秒,而不是理想的 ~6 秒。

实际不需要的输出:

04:55:50.526 > Downloading #1
04:55:51.595 > Saving #1
04:55:52.598 > Downloading #2
04:55:53.609 > Saving #2
04:55:54.615 > Downloading #3
04:55:55.616 > Saving #3
04:55:56.617 > Downloading #4
04:55:57.619 > Saving #4
04:55:58.621 > Downloading #5
04:55:59.622 > Saving #5
Duration: 10,115 msec

假设的理想输出:

04:55:50.000 > Downloading #1
04:55:51.000 > Saving #1
04:55:51.000 > Downloading #2
04:55:52.000 > Saving #2
04:55:52.000 > Downloading #3
04:55:53.000 > Saving #3
04:55:53.000 > Downloading #4
04:55:54.000 > Saving #4
04:55:54.000 > Downloading #5
04:55:55.000 > Saving #5
Duration: 6,000 msec

我正在考虑实现一个名为 ForEachConcurrentAsync 的自定义扩展方法,与前述 ForEachAwaitAsync 具有相同的签名方法,但具有允许枚举和对项目进行操作同时发生的行为。下面是该方法的一个 stub :

/// <summary>
/// Invokes and awaits an asynchronous action on each element in the source sequence.
/// Each action is awaited concurrently with fetching the sequence's next element.
/// </summary>
public static Task ForEachConcurrentAsync<T>(
    this IAsyncEnumerable<T> source,
    Func<T, Task> action,
    CancellationToken cancellationToken = default)
{
    // What to do?
}

如何实现此功能?

其他要求:

  1. 在取消或失败的情况下泄漏正在运行的任务是 Not Acceptable 。该方法完成时,所有启动的任务都应完成。
  2. 在枚举和操作都失败的极端情况下,只应传播两个异常之一,任意一个都可以。
  3. 该方法应该是真正异步的,并且不应阻塞当前线程(除非 action 参数包含阻塞代码,但这是调用者有责任防止的)。

说明:

  1. 如果保存数据比从网络下载数据花费的时间更长,则该方法不应继续提前下载更多项目。最多只能提前下载一份数据,同时保存前一份数据。

  2. IAsyncEnumerable<string>网络数据是这个问题的起点。我不想更改 IAsyncEnumerable<string> 的生成器方法。我想在枚举可枚举项时对其元素进行操作(通过将它们保存到数据库中)。

最佳答案

听起来您只需要跟踪上一个操作的任务并在下一个操作任务之前等待它。

public static async Task ForEachConcurrentAsync<T>(
    this IAsyncEnumerable<T> source,
    Func<T, Task> action,
    CancellationToken cancellationToken = default)
{
    Task previous = null;
    try
    {
        await source.ForEachAwaitAsync(async item =>
        {
            if(previous != null)
            {
                await previous;
            }

            previous = action(item);
        });
    }
    finally
    {
        if(previous != null)
        {
            await previous;
        }
    }
}

剩下的就是添加取消代码。

关于c# - 如何枚举 IAsyncEnumerable<T> 并为每个元素调用异步操作,从而允许每个迭代/操作对并发?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66192350/

相关文章:

c# - 如何在 mvc .net 上使用没有 "action is ambiguous"的基本 Controller 的 Action ?

c# - 更改 WPF 窗口的内容

java - Java中的多个生产者和消费者问题(没有BlockingQueue)

java - 线程和闪屏

vue.js - 如何将 Axios 调用循环与等待函数结合起来?

task-parallel-library - I/O 性能 - 异步 vs TPL vs Dataflow vs RX

c# - 使用 LINQ 将 List<string> 与 List<object> 匹配?

JavaScript 跳过 AJAX Post

java - 在 Java 中计算 ConcurrentHashmap 中的 delta

python - 这怎么是协程?