c# - 从头开始实现 IObservable<T>

标签 c# system.reactive

Reactive Extensions 附带了许多辅助方法,用于将现有事件和异步操作转换为可观察对象,但您将如何从头开始实现 IObservable

IEnumerable 具有可爱的 yield 关键字,使其非常易于实现。

什么是实现 IObservable 的正确方法?

我需要担心线程安全吗?

我知道支持在特定的同步上下文中回调,但这是我作为 IObservable 作者需要担心的事情还是以某种方式内置?

更新:

这是我的 C# 版本的 Brian 的 F# 解决方案

using System;
using System.Linq;
using Microsoft.FSharp.Collections;

namespace Jesperll
{
    class Observable<T> : IObservable<T>, IDisposable where T : EventArgs
    {
        private FSharpMap<int, IObserver<T>> subscribers = 
                 FSharpMap<int, IObserver<T>>.Empty;
        private readonly object thisLock = new object();
        private int key;
        private bool isDisposed;

        public void Dispose()
        {
            Dispose(true);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (disposing && !isDisposed)
            {
                OnCompleted();
                isDisposed = true;
            }
        }

        protected void OnNext(T value)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnNext(value);
            }
        }

        protected void OnError(Exception exception)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            if (exception == null)
            {
                throw new ArgumentNullException("exception");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnError(exception);
            }
        }

        protected void OnCompleted()
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnCompleted();
            }
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (observer == null)
            {
                throw new ArgumentNullException("observer");
            }

            lock (thisLock)
            {
                int k = key++;
                subscribers = subscribers.Add(k, observer);
                return new AnonymousDisposable(() =>
                {
                    lock (thisLock)
                    {
                        subscribers = subscribers.Remove(k);
                    }
                });
            }
        }
    }

    class AnonymousDisposable : IDisposable
    {
        Action dispose;
        public AnonymousDisposable(Action dispose)
        {
            this.dispose = dispose;
        }

        public void Dispose()
        {
            dispose();
        }
    }
}

编辑:如果 Dispose 被调用两次,不要抛出 ObjectDisposedException

最佳答案

official documentation反对用户自己实现 IObservable。相反,用户应该使用工厂方法 Observable.Create

When possible, implement new operators by composing existing operators. Otherwise implement custom operators using Observable.Create

碰巧 Observable.Create 是 Reactive 内部类 AnonymousObservable 的简单包装器:

public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
{
    if (subscribe == null)
    {
        throw new ArgumentNullException("subscribe");
    }
    return new AnonymousObservable<TSource>(subscribe);
}

我不知道他们为什么不公开他们的实现,但是嘿,随便吧。

关于c# - 从头开始实现 IObservable<T>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/1768974/

相关文章:

c# - ubuntu linux 上的虚拟 socat 串口和 c#

c# - 处理后流式传输 IConnectableObservable

c# - 调度程序调度程序 - Rx

c# - 环境.SpecialFolder问题

c# - "Page.IsValid cannot be called"出现在 Page_PreRender 事件处理程序中

C# 匿名函数递归

.net - RX运算符的更改间隔?

c# - 使用 Reactive Extensions 进行数据库轮询

java - 如何使用 RxJava 加载相关对象

c# - GridView EditItemTemplate 中的 DataBinding DropDownList