c# - 将 IEnumerable<Task<T>> 转换为 IObservable<T>

标签 c# .net task-parallel-library system.reactive

我正在尝试使用 Reactive Extensions (Rx) 在任务完成时缓冲这些任务的枚举。有谁知道是否有一种干净的内置方法可以做到这一点? ToObservable扩展方法只会生成一个 IObservable<Task<T>> ,这不是我想要的,我想要一个 IObservable<T> ,然后我可以使用 Buffer上。

人为的例子:

//Method designed to be awaitable
public static Task<int> makeInt()
{
     return Task.Run(() => 5);
}

//In practice, however, I don't want to await each individual task
//I want to await chunks of them at a time, which *should* be easy with Observable.Buffer 
public static void Main() 
{
    //Make a bunch of tasks
    IEnumerable<Task<int>> futureInts = Enumerable.Range(1, 100).Select(t => makeInt());

    //Is there a built in way to turn this into an Observable that I can then buffer?
    IObservable<int> buffered = futureInts.TasksToObservable().Buffer(15); //????

    buffered.Subscribe(ints => {
        Console.WriteLine(ints.Count()); //Should be 15
    });
}

最佳答案

您可以使用 Task 可以使用 another overload of ToObservable() 转换为 observable 的事实.

当您拥有一组(单项)可观察对象时,您可以使用 Merge() 创建一个包含项目完成的可观察对象.

因此,您的代码可能如下所示:

futureInts.Select(t => t.ToObservable())
          .Merge()
          .Buffer(15)
          .Subscribe(ints => Console.WriteLine(ints.Count));

关于c# - 将 IEnumerable<Task<T>> 转换为 IObservable<T>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17051060/

相关文章:

c# - 在 C# 中使用动态 "binding expression"格式化字符串

c# - 如何将 Parallel.ForEach 的结果发布到在 C# 中不断读取的队列

c# - PLinq 本质上比 System.Threading.Tasks.Parallel.ForEach 快吗

c# - 异步任务方法 WaitingForActivation

c# - 在 C# 中,我应该在哪里保存计时器的引用?

c# - StringBuilder.Append 与 StringBuilder.AppendFormat

c# - ASP.Net MVC 2.0 : EditorFor setting name via attributes

c# - 将多种文件类型保存到一个文件中

c# - 无法将解析的对象转换为通用抽象 autofac C#

c# - VS2010 - 找不到引用的组件 'X'。 - 禁用自动 "add reference"