c# - 以最大并行度将 IEnumerable<T> 转换为 IObservable<T>

标签 c# async-await system.reactive

我有一系列异步任务要做(比如,获取 N 个网页)。现在我想要的是将它们全部公开为 IObservable<T> 。我当前的解决方案使用 this question 的答案:

async Task<ResultObj> GetPage(string page) {
    Console.WriteLine("Before");
    var result = await FetchFromInternet(page);
    Console.WriteLine("After");
    return result;
}

// pages is an IEnumerable<string>
IObservable<ResultObj> resultObservable =pages.Select(GetPage).
                 Select(t => Observable.FromAsync(() => t)).Merge();

// Now consume the list
foreach(ResultObj obj in resultObservable.ToEnumerable()) {
    Console.WriteLine(obj.ToString());
}

问题是我不知道要获取的页面数量,而且可能很大。我不想同时发出数百个请求。所以我想要一种方法来限制将并行执行的最大任务数。有没有办法限制并发调用 GetPage 的数量?

有一个 Merge 重载,它接受一个 maxConcurrent 参数,但它似乎并没有真正限制函数调用的并发。控制台在 After 消息之前打印所有 Before 消息。

注意:我需要转换回 IEnumerable<T> 。我正在为一个系统编写一个数据源,该系统为我提供要获取的数据描述符,我需要将已下载数据的列表还给它。

最佳答案

编辑

以下应该有效。 This overload限制并发订阅的数量。

var resultObservable = pages
  .Select(p => Observable.FromAsync(() => GetPage(p)))
  .Merge(maxConcurrent);

解释

为了理解为什么需要进行此更改,我们需要一些背景知识

  1. FromAsync 返回一个将调用传递的 Func 的可观察对象 every time it is subscribed to .这意味着如果从未订阅可观察对象,则永远不会调用它。

  2. Merge 急切地读取源序列,并且只同时订阅 n 个可观察对象。

通过这两部分我们可以知道为什么原始版本将并行执行所有内容:因为 (2),GetPage 将在 时为所有源字符串调用Merge 决定需要订阅多少个 observable。

我们还可以看到为什么第二个版本有效:即使序列已经完全迭代,(1) 意味着 GetPageMerge 决定之前不会被调用它需要订阅 n observables。这导致仅 n 任务同时执行的预期结果。

关于c# - 以最大并行度将 IEnumerable<T> 转换为 IObservable<T>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25436542/

相关文章:

c# - mvc 中的 ON DUPLICATE KEY UPDATE 问题

c# - 在 mvc core 2.1.3 API 中使用 FormatFilter 属性

C# 在执行计算时丢失小数分辨率

c# - 从单个字符串列请求 1..N 值时,QueryAsync() 返回什么?

c# - Task 与 Task<TResult> 的不同行为

wpf - Rx DragBehavior->为什么我的元素不动

ios - ReactiveX RxSwift 从可观察对象的连接中获得第一个非错误

c# - 生成代码中的代码分析警告(如迁移)- Visual Studio (C#)

c# - 在单个按钮上协调多个 IObservable

node.js - 在 for 循环中等待异步函数