我试图枚举一个大 IEnumerable
一次,观察附有各种运算符的枚举( Count
、 Sum
、 Average
等)。显而易见的方法是将其转换为 IObservable
与方法 ToObservable
,然后订阅一个观察者。我注意到这比其他方法慢得多,比如做一个简单的循环并在每次迭代时通知观察者,或者使用 Observable.Create
方法而不是 ToObservable
.差异很大:它慢了 20-30 倍。就是这样,还是我做错了什么?
using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
public static class Program
{
static void Main(string[] args)
{
const int COUNT = 10_000_000;
Method1(COUNT);
Method2(COUNT);
Method3(COUNT);
}
static void Method1(int count)
{
var source = Enumerable.Range(0, count);
var subject = new Subject<int>();
var stopwatch = Stopwatch.StartNew();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
static void Method2(int count)
{
var source = Enumerable.Range(0, count);
var subject = new Subject<int>();
var stopwatch = Stopwatch.StartNew();
foreach (var item in source) subject.OnNext(item);
subject.OnCompleted();
Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
static void Method3(int count)
{
var source = Enumerable.Range(0, count);
var subject = new Subject<int>();
var stopwatch = Stopwatch.StartNew();
Observable.Create<int>(o =>
{
foreach (var item in source) o.OnNext(item);
o.OnCompleted();
return Disposable.Empty;
}).Subscribe(subject);
Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
}
输出:
ToObservable: 7,576 msec
Loop & Notify: 273 msec
Observable.Create: 511 msec
.NET Core 3.0, C# 8, System.Reactive 4.3.2, Windows 10, Console App, Release built
更新:这是我想要实现的实际功能的示例:
var source = Enumerable.Range(0, 10_000_000).Select(i => (long)i);
var subject = new Subject<long>();
var cntTask = subject.Count().ToTask();
var sumTask = subject.Sum().ToTask();
var avgTask = subject.Average().ToTask();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"Count: {cntTask.Result:#,0}, Sum: {sumTask.Result:#,0}, Average: {avgTask.Result:#,0.0}");
输出:
Count: 10,000,000, Sum: 49,999,995,000,000, Average: 4,999,999.5
与使用标准 LINQ 相比,此方法的重要区别运算符,是源可枚举只被枚举一次。
还有一个观察:使用
ToObservable(Scheduler.Immediate)
比 ToObservable()
稍快(约 20%) .
最佳答案
这就是表现良好的 observable 和“滚动你自己的,因为你认为更快更好,但它不是”可观察的区别。
当你深入到源代码中时,你会发现这条可爱的小线:
scheduler.Schedule(this, (IScheduler innerScheduler, _ @this) => @this.LoopRec(innerScheduler));
有效地调用
hasNext = enumerator.MoveNext();
每个预定的递归迭代一次。这允许您为您的
.ToObservable(schedulerOfYourChoice)
选择调度程序。称呼。使用您选择的其他选项,您创建了一系列对
.OnNext
的直接调用。那实际上什么都不做。 Method2
甚至没有 .Subscribe
称呼。两者
Method2
和 Method1
使用当前线程运行,并在订阅完成之前运行到完成。他们正在阻止调用。它们会导致竞争条件。Method1
是唯一一个表现良好的可观察对象。它是异步的,可以独立于订阅者运行。请记住,可观察对象是随时间运行的集合。它们通常具有异步源或计时器或对外部刺激的响应。他们不会经常跑掉一个简单的可枚举。如果您正在使用可枚举,那么同步工作应该会运行得更快。
速度不是 Rx 的目标。目标是对基于时间的推送值执行复杂查询。
关于c# - 为什么 IEnumerable.ToObservable 这么慢?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60987491/