c#-4.0 - IObservable 的Where() 扩展方法是如何实现的?

标签 c#-4.0 functional-programming system.reactive

我一直在尝试更深入地了解 Rx,通过关注 Bart De Smetts MinLinq 和 Jon Skeets 的“重新实现”系列,我已经建立了很好的理解,但是......

以下代码为例

var onePerSecond = Observable.Interval(TimeSpan.FromSeconds(1));
var evenNums = onePerSecond.Where(x => x % 2 == 0);
evenNums.Subscribe(Console.WriteLine);

从等效的 IEnumerable 角度来看,我了解 MoveNext/Current 的数据流,并且从 Mr Skeets 的博客中了解如何通过在 IEnumerable 'this' 参数上使用 foreach 来实现Where 方法扩展方法。

但是在 IObservable 的Where方法的情况下,它是否包含实现IObserver接口(interface)(或lambda等价物)的代码,因此有效地观察来自onePerSecond对象的所有通知,并反过来返回一个仅包含以下值的IObservable谓词被发现为真?

非常欢迎任何帮助和想法,非常感谢

詹姆斯

最佳答案

通过使用 ILSpy 查看源代码,很容易看出这正是Where 的实现方式。它返回一个新的可观察量,该可观察量根据您传递的谓词过滤项目:

public static IObservable<TSource> Where<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
{
    if (source == null)
    {
        throw new ArgumentNullException("source");
    }
    if (predicate == null)
    {
        throw new ArgumentNullException("predicate");
    }
    return new AnonymousObservable<TSource>((IObserver<TSource> observer) => source.Subscribe(delegate(TSource x)
    {
        bool flag;
        try
        {
            flag = predicate(x);
        }
        catch (Exception error)
        {
            observer.OnError(error);
            return;
        }
        if (flag)
        {
            observer.OnNext(x);
        }
    }
    , new Action<Exception>(observer.OnError), new Action(observer.OnCompleted)));
}

关于c#-4.0 - IObservable 的Where() 扩展方法是如何实现的?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9994360/

相关文章:

当我使用 TimeSpan 时,MySQL 连接器抛出异常只有 TimeSpan 对象可以被 MySqlTimeSpan 序列化

c# - 使用 C# ASP.NET MVC 使用 WCF Rest 服务 (JSON)

haskell - 是否存在使用代数数据类型或多态性的 OOP 抽象类的 Haskell 等效项?

java - 如何使用 vavr 仅记录特定异常

java - Kotlin 序列 : filter + find first + map

c# - 响应式扩展订阅调用等待

c# - 扩展数组的最快方法

c# - 具有值类型键和引用类型值的通用字典

c# - 在 .NET Standard 库中访问 Dispatcher IScheduler

system.reactive - Rx.Net - 获取股票价格变化并处理它们