c# - 热/冷 Observable,增加订阅者

标签 c# system.reactive

如何将 MainEngine Observable 转换为 Cold?来自这个例子:

    public IObservable<int> MainEngine
    {
        get
        {
            Random rnd = new Random();
            int maxValue = rnd.Next(20);
            System.Diagnostics.Trace.TraceInformation("Max value is: " + maxValue.ToString());

            return (from sinlgeInt in Enumerable.Range(0, maxValue)
                    select sinlgeInt).ToObservable();
        }
    }

    public void Main()
    {
        // 1
        MainEngine.Subscribe(
                onNext: (item) => { System.Diagnostics.Trace.TraceInformation("Value is: " + item.ToString()); }
        );

        // 2
        MainEngine.Subscribe(
                onNext: (item) => { System.Diagnostics.Trace.TraceInformation("Gonna put it into XML: " + item.ToString()); }
        );
    }

问题 1:在订阅者 1 和订阅者 2 上我得到了不同的结果,但我希望他们都收到相同的结果。

问题 2:从我添加第二个订阅者开始,他们都继续收到相同的结果。

最佳答案

关于您的第一个问题,问题是观察者没有订阅相同的 IObservable,因为您调用了两次 getter。

IObservable 分配给局部变量似乎可以解决问题:

IObservable<int> mainEngine = MainEngine;

mainEngine.Subscribe(onNext: (item) => { /* ... */ });
mainEngine.Subscribe(onNext: (item) => { /* ... */ });  

关于你的第二个问题,如果你想分享对单个 IObservable 的订阅,你可以使用 Publish 方法:

IConnectableObservable<int> published = MainEngine.Publish();

published.Subscribe(onNext: (item) => { Console.WriteLine(item + " on observer 1"); });
published.Subscribe(onNext: (item) => { Console.WriteLine(item + " on observer 2"); });

published.Connect();

然后两个订阅者将以交错的方式看到来自 IObservable 的结果:

0 on observer 1
0 on observer 2
1 on observer 1
1 on observer 2
etc.

您还可以在调用 Subscribe 之后订阅新的观察者,之后所有订阅者都将看到相同的事件。您可以通过在新线程上运行可观察对象并引入延迟来修改您的示例以对此进行测试:

public static void Main()
{
    Random rnd = new Random();
    int maxValue = rnd.Next(20);

    /* Zip with Observable.Interval to introduce a delay */
    IObservable<int> mainEngine = Observable.Range(0, maxValue, Scheduler.NewThread)
        .Zip(Observable.Interval(TimeSpan.FromMilliseconds(100)), (a, b) => a);

    /* Publish the observable to share a subscription between observers */
    IConnectableObservable<int> published = mainEngine.Publish();

    /* Subscribe the first observer immediately, events are not yet being observed */
    published.Subscribe(onNext: (item) => { Console.WriteLine(item + " on observer 1"); });

    /* Start pushing events to the first observer */
    published.Connect();

    /* Wait one second and then subscribe the second observer */
    Thread.Sleep(1000);
    published.Subscribe(onNext: (item) => { Console.WriteLine(item + " on observer 2"); });

    Console.ReadKey();
}

您只会在第一个观察者身上看到一秒钟的事件,然后两个观察者将同时看到每个事件。

关于c# - 热/冷 Observable,增加订阅者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11953406/

相关文章:

c# - 将匿名类型反序列化为动态类型

c# - 从 ASP.NET 中的 Seek Position 流式传输 MP4 视频

c# - 我应该在我的接口(interface)上公开 IObservable<T> 吗?

c# - 为什么这个 Cancellation Disposable 永远不会在 Observable.Dispose() 上被取消?

c# - WPF:按下鼠标左键更改边框的背景颜色

c# - 连续创建位图会导致内存泄漏

c# - 从 C# 上传 zip 文件到 S3

c# - 围绕一组不断变化的依赖可观察量创建可观察量

.net - "Push"linq 与响应式(Reactive)框架

.net - Observable.Timer() 会导致内存泄漏吗?