我一直在尝试更深入地了解 Rx,通过关注 Bart De Smetts MinLinq 和 Jon Skeets 的“重新实现”系列,我已经建立了很好的理解,但是......
以下代码为例
var onePerSecond = Observable.Interval(TimeSpan.FromSeconds(1));
var evenNums = onePerSecond.Where(x => x % 2 == 0);
evenNums.Subscribe(Console.WriteLine);
从等效的 IEnumerable 角度来看,我了解 MoveNext/Current 的数据流,并且从 Mr Skeets 的博客中了解如何通过在 IEnumerable 'this' 参数上使用 foreach 来实现Where 方法扩展方法。
但是在 IObservable 的Where方法的情况下,它是否包含实现IObserver接口(interface)(或lambda等价物)的代码,因此有效地观察来自onePerSecond对象的所有通知,并反过来返回一个仅包含以下值的IObservable谓词被发现为真?
非常欢迎任何帮助和想法,非常感谢
詹姆斯
最佳答案
通过使用 ILSpy 查看源代码,很容易看出这正是Where 的实现方式。它返回一个新的可观察量,该可观察量根据您传递的谓词过滤项目:
public static IObservable<TSource> Where<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (predicate == null)
{
throw new ArgumentNullException("predicate");
}
return new AnonymousObservable<TSource>((IObserver<TSource> observer) => source.Subscribe(delegate(TSource x)
{
bool flag;
try
{
flag = predicate(x);
}
catch (Exception error)
{
observer.OnError(error);
return;
}
if (flag)
{
observer.OnNext(x);
}
}
, new Action<Exception>(observer.OnError), new Action(observer.OnCompleted)));
}
关于c#-4.0 - IObservable 的Where() 扩展方法是如何实现的?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9994360/