c# - 如何分离 IObservable 和 IObserver

标签 c# reactive-programming system.reactive publish-subscribe reactiveui

更新:查看底部的示例
我需要在课间发消息。发布者将无限循环,调用一些方法来获取数据,然后将该调用的结果传递给 OnNext .可以有很多订阅者,但应该只有一个 IObservable 和一个长期运行的任务。这是一个实现。

using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;

namespace UnitTestProject1
{
    [TestClass]
    public class UnitTest1
    {
        private static string GetSomeData() => "Hi";

        [TestMethod]
        public async Task RunMessagingAsync()
        {
            var subject = new Subject<string>();

            //Create a class and inject the subject as IObserver
            new Publisher(subject);

            //Create a class and inject the subject as IObservable
            new Subscriber(subject, 1.ToString());
            new Subscriber(subject, 2.ToString());
            new Subscriber(subject, 3.ToString());

            //Run the loop for 3 seconds
            await Task.Delay(3000);
        }

        class Publisher
        {
            public Publisher(IObserver<string> observer)
            {
                Task.Run(async () =>
                {
                    //Loop forever
                    while (true)
                    {
                        //Get some data, publish it with OnNext and wait 500 milliseconds
                        observer.OnNext(GetSomeData());
                        await Task.Delay(500);
                    }
                });
            }
        }

        class Subscriber
        {
            public string Name;

            //Listen for OnNext and write to the debug window when it happens
            public Subscriber(IObservable<string> observable, string name)
            {
                Name = name;
                var disposable = observable.Subscribe((s) => Debug.WriteLine($"Name: {Name} Message: {s}"));
            }
        }
    }
}
输出:

Name: 1 Message: Hi


Name: 2 Message: Hi


Name: 3 Message: Hi


Name: 1 Message: Hi


Name: 2 Message: Hi


Name: 3 Message: Hi


这工作正常。注意只有一个 IObserver发送消息,但所有订阅都接收消息。 但是,我该如何分离 IObservableIObserver ? 它们以 Subject 的形式粘在一起.这是另一种方法。
[TestMethod]
public async Task RunMessagingAsync2()
{
    var observers = new List<IObserver<string>>();

    var observable = Observable.Create(
    (IObserver<string> observer) =>
    {
        observers.Add(observer);

        Task.Run(async () =>
        {
            while (true)
            {
                try
                {
                    observer.OnNext(GetSomeData());
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                }

                await Task.Delay(500);
            }
        });

        return Disposable.Create(() => { });
    });

    //Create a class and inject the subject as IObservable
    new Subscriber(observable);
    new Subscriber(observable);

    //Run the loop for 10 seconds
    await Task.Delay(10000);

    Assert.IsTrue(ReferenceEquals(observers[0], observers[1]));
}
这里的问题是这会创建两个单独的 Task s 和两个独立的 IObserver s。每个订阅都会创建一个新的 IObserver。您可以确认,因为 Assert这里失败了。这对我来说真的没有任何意义。根据我对响应式(Reactive)编程的理解,我不希望 Subscribe方法在这里创建一个新的IObserver每一次。退房 this gist .这是对 Observable.Create example 的轻微修改.它显示了 Subscribe 方法如何在每次调用时创建 IObserver。 如何在不使用 Subject 的情况下实现第一个示例中的功能?
这是另一种完全不使用响应式(Reactive) UI 的方法...您可以创建一个 Subject如果您愿意,可以从出版商那里购买,但这不是必需的。
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Threading.Tasks;

namespace UnitTestProject1
{
    [TestClass]
    public class UnitTest1
    {
        private static string GetSomeData() => "Hi";
   
        class Publisher
        {
            public Publisher(Action<string> onNext)
            {
                Task.Run(async () =>
                {
                    //Loop forever
                    while (true)
                    {
                        //Get some data, publish it with OnNext and wait 500 milliseconds
                        onNext(GetSomeData());
                        await Task.Delay(500);
                    }
                });
            }
        }

        class Subscriber
        {
            //Listen for OnNext and write to the debug window when it happens
            public void ReceiveMessage(string message) => Debug.WriteLine(message);
        }

        [TestMethod]
        public async Task RunMessagingAsync()
        {
            //Create a class and inject the subject as IObservable
            var subscriber = new Subscriber();

            //Create a class and inject the subject as IObserver
            new Publisher(subscriber.ReceiveMessage);

            //Run the loop for 10 seconds
            await Task.Delay(10000);
        }
    }
}
最后,我应该补充一点,ReactiveUI 曾经有一个 MessageBus class .我不确定它是否已被删除,但不再推荐它。他们建议我们改用什么?
工作示例
这个版本是对的。我想我现在唯一要问的是 我如何用 Observable.Create 做同样的事情 ? Observable.Create 的问题是它为每个订阅运行操作。这不是预期的功能。无论有多少订阅,这里的长时间运行的任务都只运行一次。
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace UnitTestProject1
{
    class Subscriber
    {
        public string Name;

        //Listen for OnNext and write to the debug window when it happens
        public Subscriber(IObservable<string> observable, string name)
        {
            Name = name;
            var disposable = observable.Subscribe((s) => Debug.WriteLine($"Name: {Name} Message: {s}"));
        }
    }

    internal class BasicObservable<T> : IObservable<T>
    {
        List<IObserver<T>> _observers = new List<IObserver<T>>();

        public BasicObservable(
            Func<T> getData,
            TimeSpan? interval = null,
            CancellationToken cancellationToken = default
            ) =>

            Task.Run(async () =>
            {
                while (!cancellationToken.IsCancellationRequested)
                {
                    try
                    {
                        await Task.Delay(interval ?? new TimeSpan(0, 0, 1));
                        var data = getData();
                        _observers.ForEach(o => o.OnNext(data));
                    }
                    catch (Exception ex)
                    {
                        _observers.ForEach(o => o.OnError(ex));
                    }
                }

                _observers.ForEach(o => o.OnCompleted());

            }, cancellationToken);

        public IDisposable Subscribe(IObserver<T> observer)
        {
            _observers.Add(observer);
            return Disposable.Create(observer, (o) => _observers.Remove(o));
        }
    }

    public static class ObservableExtensions
    {
        public static IObservable<T> CreateObservable<T>(
            this Func<T> getData,
            CancellationToken cancellationToken = default)
        => new BasicObservable<T>(getData, default, cancellationToken);

        public static IObservable<T> CreateObservable<T>(
            this Func<T> getData,
            TimeSpan? interval = null,
            CancellationToken cancellationToken = default)
        => new BasicObservable<T>(getData, interval, cancellationToken);
    }

    [TestClass]
    public class UnitTest1
    {
        string GetData() => "Hi";

        [TestMethod]
        public async Task Messaging()
        {
            var cancellationSource = new CancellationTokenSource();
            var cancellationToken = cancellationSource.Token;

            Func<string> getData = GetData;

            var publisher = getData.CreateObservable(cancellationToken);

            new Subscriber(publisher, "One");
            new Subscriber(publisher, "Two");

            for (var i = 0; true; i++)
            {
                if (i >= 5)
                {
                    cancellationSource.Cancel();
                }

                await Task.Delay(1000);
            }
        }
    }

}

最佳答案

首先你必须熟悉"cold" and "hot" observables的理论。 .这是来自 Introduction to RX 的定义.

  • 是被动的序列,并开始根据请求(订阅时)生成通知。
  • 热门 是事件的序列,无论订阅如何都会产生通知。

  • 你想要的是一个热门的 observable,问题是 Observable.Create方法创建冷观察。但是您可以使用 Publish 使任何可观察对象变得热。运算符(operator)。此运算符提供了一种方法,使多个独立观察者共享单个底层订阅。例子:
    int index = 0;
    var coldObservable = Observable.Create<int>(observer =>
    {
        _ = Task.Run(async () =>
        {
            while (true)
            {
                observer.OnNext(++index);
                await Task.Delay(1000);
            }
        });
        return Disposable.Empty;
    });
    
    IConnectableObservable<int> hotObservable = coldObservable.Publish();
    hotObservable.Connect(); // Causes the start of the loop
    
    hotObservable.Subscribe(s => Console.WriteLine($"Observer A received #{s}"));
    hotObservable.Subscribe(s => Console.WriteLine($"Observer B received #{s}"));
    
    coldObservable创建者 Observable.Create订阅时 hotObservable.Connect方法被调用,然后该单个订阅生成的所有通知将传播到 hotObservable 的所有订阅者。 .
    输出:
    Observer A received #1
    Observer B received #1
    Observer A received #2
    Observer B received #2
    Observer A received #3
    Observer B received #3
    Observer A received #4
    Observer B received #4
    Observer A received #5
    Observer B received #5
    Observer A received #6
    Observer B received #6
    ...
    
    重要提示:上面例子的目的是为了演示Publish运营商,而不是作为高质量 RX 代码的示例。它的一个问题是,通过在连接到源之后订阅观察者在理论上可能不会将第一个通知发送给部分或全部观察者,因为它可能在订阅之前创建。换句话说,存在竞争条件。
    有一种管理 IConnectableObservable 生命周期的替代方法。 , 运营商 RefCount :

    Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.

    var hotObservable = coldObservable.Publish().RefCount();
    
    这样你就不需要 Connect手动。连接在第一次订阅时自动发生,并在最后一次取消订阅时自动处理。

    关于c# - 如何分离 IObservable 和 IObserver,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64508362/

    相关文章:

    c# - 如何使用 c# 将使用 odbc 源的特殊字符获取到 Oracle 数据库?

    javascript - RxJs 映射后未定义的值

    spring-boot - 为什么使用 switchIfEmpty 时项目reactor会无限期挂起?

    从 IEnumerable<IObservable<string>> 到 IObservable<string> 的 C# 响应式(Reactive)扩展

    javascript - 尝试使用 fetch api 和 C# Web API 上传图像

    C# 在使用循环访问内存时比 Java 慢一半?

    c# - SelectedIndexChanged 事件未针对 1 个列表项触发

    java - 无法解析方法 subscribe(rx.Scheduler)

    .net - 使用 RX Throttle 时的跨线程异常

    Rx 的 F# 工作流构建器