我正在尝试使用 Reactive Extensions (Rx) 在任务完成时缓冲这些任务的枚举。有谁知道是否有一种干净的内置方法可以做到这一点? ToObservable
扩展方法只会生成一个 IObservable<Task<T>>
,这不是我想要的,我想要一个 IObservable<T>
,然后我可以使用 Buffer
上。
人为的例子:
//Method designed to be awaitable
public static Task<int> makeInt()
{
return Task.Run(() => 5);
}
//In practice, however, I don't want to await each individual task
//I want to await chunks of them at a time, which *should* be easy with Observable.Buffer
public static void Main()
{
//Make a bunch of tasks
IEnumerable<Task<int>> futureInts = Enumerable.Range(1, 100).Select(t => makeInt());
//Is there a built in way to turn this into an Observable that I can then buffer?
IObservable<int> buffered = futureInts.TasksToObservable().Buffer(15); //????
buffered.Subscribe(ints => {
Console.WriteLine(ints.Count()); //Should be 15
});
}
最佳答案
您可以使用 Task
可以使用 another overload of ToObservable()
转换为 observable 的事实.
当您拥有一组(单项)可观察对象时,您可以使用 Merge()
创建一个包含项目完成的可观察对象.
因此,您的代码可能如下所示:
futureInts.Select(t => t.ToObservable())
.Merge()
.Buffer(15)
.Subscribe(ints => Console.WriteLine(ints.Count));
关于c# - 将 IEnumerable<Task<T>> 转换为 IObservable<T>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17051060/