我有一个返回异步枚举器的方法
public async IAsyncEnumerable<IResult> DoWorkAsync()
{
await Something();
foreach (var item in ListOfWorkItems)
{
yield return DoWork(item);
}
}
和来电者: public async Task LogResultsAsync()
{
await foreach (var result in DoWorkAsync())
{
Console.WriteLine(result);
}
}
因为DoWork
是一项昂贵的操作,我更愿意以某种方式并行化它,因此它的工作方式类似于: public async IAsyncEnumerable<IResult> DoWorkAsync()
{
await Something();
Parallel.ForEach(ListOfWorkItems, item =>
{
yield return DoWork(item);
});
}
但是我不能从内部进行 yield 返回 Parallel.Foreach
所以只是想知道最好的方法是什么?返回结果的顺序无关紧要。
谢谢。
编辑:抱歉,我在
DoWorkAsync
中遗漏了一些代码,它确实在等待我只是没有把它放在上面的代码中的东西,因为这与问题不太相关。现已更新编辑2:
DoWork
就我而言,主要是 I/O 绑定(bind),它从数据库中读取数据。
最佳答案
这是一个使用 TransformBlock
的基本实现。来自 TPL Dataflow图书馆:
public async IAsyncEnumerable<IResult> GetResults(List<IWorkItem> workItems)
{
// Define the dataflow block
var block = new TransformBlock<IWorkItem, IResult>(async item =>
{
return await TransformAsync(item);
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 10, // the default is 1
EnsureOrdered = false // the default is true
});
// Feed the block with input data
foreach (var item in workItems)
{
block.Post(item);
}
block.Complete();
// Stream the block's output as IAsyncEnumerable
while (await block.OutputAvailableAsync())
{
while (block.TryReceive(out var result))
{
yield return result;
}
}
// Propagate possible exceptions
await block.Completion;
}
这个实现并不完美,因为万一 IAsyncEnumerable
的使用者过早地放弃枚举,TransformBlock
将继续在后台工作,直到处理完所有工作项。它也不支持取消,这都是值得尊敬的IAsyncEnumerable
制作方法应该支持。这些缺失的功能可以相对容易地添加。如果您有兴趣添加它们,请查看 this问题。
关于c# - 在 C#8 IAsyncEnumerable<T> 中并行化 yield 返回,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63391097/