我有一系列异步任务要做(比如,获取 N 个网页)。现在我想要的是将它们全部公开为 IObservable<T>
。我当前的解决方案使用 this question 的答案:
async Task<ResultObj> GetPage(string page) {
Console.WriteLine("Before");
var result = await FetchFromInternet(page);
Console.WriteLine("After");
return result;
}
// pages is an IEnumerable<string>
IObservable<ResultObj> resultObservable =pages.Select(GetPage).
Select(t => Observable.FromAsync(() => t)).Merge();
// Now consume the list
foreach(ResultObj obj in resultObservable.ToEnumerable()) {
Console.WriteLine(obj.ToString());
}
问题是我不知道要获取的页面数量,而且可能很大。我不想同时发出数百个请求。所以我想要一种方法来限制将并行执行的最大任务数。有没有办法限制并发调用 GetPage
的数量?
有一个 Merge
重载,它接受一个 maxConcurrent 参数,但它似乎并没有真正限制函数调用的并发。控制台在 After 消息之前打印所有 Before 消息。
注意:我需要转换回 IEnumerable<T>
。我正在为一个系统编写一个数据源,该系统为我提供要获取的数据描述符,我需要将已下载数据的列表还给它。
最佳答案
编辑
以下应该有效。 This overload限制并发订阅的数量。
var resultObservable = pages
.Select(p => Observable.FromAsync(() => GetPage(p)))
.Merge(maxConcurrent);
解释
为了理解为什么需要进行此更改,我们需要一些背景知识
FromAsync
返回一个将调用传递的Func
的可观察对象 every time it is subscribed to .这意味着如果从未订阅可观察对象,则永远不会调用它。Merge
急切地读取源序列,并且只同时订阅n
个可观察对象。
通过这两部分我们可以知道为什么原始版本将并行执行所有内容:因为 (2),GetPage
将在 时为所有源字符串调用Merge
决定需要订阅多少个 observable。
我们还可以看到为什么第二个版本有效:即使序列已经完全迭代,(1) 意味着 GetPage
在 Merge
决定之前不会被调用它需要订阅 n
observables。这导致仅 n
任务同时执行的预期结果。
关于c# - 以最大并行度将 IEnumerable<T> 转换为 IObservable<T>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25436542/