c# - 在 C#8 IAsyncEnumerable<T> 中并行化 yield 返回

标签 c# asynchronous iasyncenumerable

我有一个返回异步枚举器的方法

    public async IAsyncEnumerable<IResult> DoWorkAsync()
    {
        await Something();
        foreach (var item in ListOfWorkItems)
        {
            yield return DoWork(item);
        }
    }
和来电者:
    public async Task LogResultsAsync()
    {
        await foreach (var result in DoWorkAsync())
        {
            Console.WriteLine(result);
        }
    }
因为DoWork是一项昂贵的操作,我更愿意以某种方式并行化它,因此它的工作方式类似于:
    public async IAsyncEnumerable<IResult> DoWorkAsync()
    {
        await Something();
        Parallel.ForEach(ListOfWorkItems, item =>
        {
            yield return DoWork(item);
        });
    }
但是我不能从内部进行 yield 返回 Parallel.Foreach所以只是想知道最好的方法是什么?
返回结果的顺序无关紧要。
谢谢。
编辑:抱歉,我在 DoWorkAsync 中遗漏了一些代码,它确实在等待我只是没有把它放在上面的代码中的东西,因为这与问题不太相关。现已更新
编辑2: DoWork就我而言,主要是 I/O 绑定(bind),它从数据库中读取数据。

最佳答案

这是一个使用 TransformBlock 的基本实现。来自 TPL Dataflow图书馆:

public async IAsyncEnumerable<IResult> GetResults(List<IWorkItem> workItems)
{
    // Define the dataflow block
    var block = new TransformBlock<IWorkItem, IResult>(async item =>
    {
        return await TransformAsync(item);
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 10, // the default is 1
        EnsureOrdered = false // the default is true
    });

    // Feed the block with input data
    foreach (var item in workItems)
    {
        block.Post(item);
    }
    block.Complete();

    // Stream the block's output as IAsyncEnumerable
    while (await block.OutputAvailableAsync())
    {
        while (block.TryReceive(out var result))
        {
            yield return result;
        }
    }

    // Propagate possible exceptions
    await block.Completion;
}
这个实现并不完美,因为万一 IAsyncEnumerable 的使用者过早地放弃枚举,TransformBlock将继续在后台工作,直到处理完所有工作项。它也不支持取消,这都是值得尊敬的IAsyncEnumerable制作方法应该支持。这些缺失的功能可以相对容易地添加。如果您有兴趣添加它们,请查看 this问题。

关于c# - 在 C#8 IAsyncEnumerable<T> 中并行化 yield 返回,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63391097/

相关文章:

c# - 从多个 IAsyncEnumerable 流并行接收数据

c# - 使用反射延迟初始化对象

C#:为什么 Initialize 不适用于字节数组?

c# - 使用 CancellationTokenSource

data-binding - WinRT ViewModel DataBind 到异步方法

c# - 如何使用 SqlDataReader 返回和使用 IAsyncEnumerable

c# - 西欧语言的模糊搜索算法(在我的例子中是瑞典语)

node.js - Node 从 Q.async 返回生成值

android - 在何处以及如何使用 fragment 填充我的选项卡

c# - IAsyncEnumerable 中缺少 await 运算符时的警告消息