c# - SubscribeOn() 在这里做什么?

标签 c# system.reactive

我正在自学响应式编程,方法是解决随机问题并毫不羞愧地问愚蠢的新手问题。在弄清楚线程调度的工作原理时,我设法难住了自己。虽然我很确定这段代码没有逻辑意义,但我也无法理解发生了什么。弄清楚这一点可能会帮助我。这是代码:

var testScheduler = new TestScheduler();
var newThreadScheduler = new NewThreadScheduler();

var emitter = new Subject<string>();
testScheduler.Schedule(TimeSpan.FromSeconds(0.1), () => emitter.OnNext("one"));
testScheduler.Schedule(TimeSpan.FromSeconds(0.2), () => emitter.OnCompleted());

var subscription = emitter.SubscribeOn(newThreadScheduler)
                            .Subscribe(
                                item => Console.WriteLine(item),
                                error => Console.WriteLine(error),
                                () => Console.WriteLine("Complete!")
                            );

testScheduler.AdvanceBy(TimeSpan.FromSeconds(1).Ticks);

Console.WriteLine("DONE.");
Console.ReadLine();

期望可能是:

one    
DONE.
Complete!

可能会交错,因为我不太确定 SubscribeOn() 会做什么。我得到的是:

DONE.
Complete!

这里究竟发生了什么?为什么项目在完成之前没有生产? ObserveOn() 在这种情况下按我预期的方式工作,我理解原因:它在其他某个线程上运行委托(delegate),它们可以与“DONE”交错。那么 SubscribeOn() 到底在做什么?

最佳答案

这里只是一个竞争条件。

如果我们把所有的代码都还原成

var emitter = new Subject<string>();
emitter.OnNext("one");
emitter.OnCompleted();

var subscription = emitter
                            .Subscribe(
                                item => Console.WriteLine(item),
                                error => Console.WriteLine(error),
                                () => Console.WriteLine("Complete!")
                            );



Console.WriteLine("DONE.");
Console.ReadLine();

我们得到相同的结果。 通过使用 Subject<T>你不会得到任何缓存行为,除了 OnCompleted通知。

SubscribeOn运营商将安排在提供的 IScheduler 上完成任何订阅工作实例。 在订阅 Subject<T> 的情况下几乎没有工作要做。 它几乎就像将回调注册到回调列表一样简单。

NewThreadScheduler 上安排工作会创建一个新的线程,然后创建一个内部事件循环来处理预定的工作。 这非常快,但确实需要创建一个新线程,一个 EventloopScheduler 并执行到新线程的上下文切换。

在您的示例中,您安排了 OnNextOnCompleted关于 TestScheduler 的通知. 那你SubscribeOnNewThreadScheduler . 之后,您开始处理 TestScheduler 的所有计划工作。实例。 这些虚拟计划项目的处理只是迭代计划项目,执行委托(delegate)并推进虚拟时钟。 这速度快得令人难以置信。

更具体地说,下面的代码类似于您编写的代码

var newThreadScheduler =  new NewThreadScheduler();

var callbacks = new List<Action<string>>();
newThreadScheduler.Schedule(()=>callbacks.Add(str=>Console.WriteLine(str)));

foreach (var callback in callbacks)
{
    callback("one");
}

Console.WriteLine("Done");

这里我们只是有一个回调操作列表(称它们为订阅者或观察者)。 然后,我们在新线程上异步安排添加这些回调之一。 然后立即迭代回调并将字符串“one”发送给它们中的每一个。 结果是

Done

NewThreadScheduler只是没有足够的时间来启动新线程、安排操作,然后在主线程迭代集合之前执行该操作。

因此,我认为您没有遵循一些准则: 1)避免主题;-) 2) 不要混合线程和单元测试。我假设存在 TestScheduler是因为你正在测试这个。但是,您可以使用 TestScheduler 的两个实例例如背景和前景实例。

为了更有帮助,我会提供积极的指导,建议您从测试中删除第二个调度程序。 使用 TestScheduler在你的实例 SubscribeOn运营商。

接下来,我建议使用 TestScheduler 来替换 subjects+scheduling 的使用。的 Observable 序列工厂方法即 CreateColdObservable . 最后我不知道前进到 1s 的特定时间是否比仅使用 Start 有任何收获。方法。 我认为这会减少噪音和魔法值 1s 的使用。

var testScheduler = new TestScheduler();

var source = testScheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(TimeSpan.FromSeconds(0.1).Ticks, "one"),
    ReactiveTest.OnCompleted<string>(TimeSpan.FromSeconds(0.2).Ticks));

var subscription = source.SubscribeOn(testScheduler)
                            .Subscribe(
                                item => Console.WriteLine(item),
                                error => Console.WriteLine(error),
                                () => Console.WriteLine("Complete!")
                            );

testScheduler.Start();

Console.WriteLine("DONE.");
Console.ReadLine();

现在唯一的问题是 SubscribeOn调用非常多余。

仅供引用:NewThreadScheduler 的代码- https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/NewThreadScheduler.cs

关于c# - SubscribeOn() 在这里做什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41193324/

相关文章:

c# - 数据适配器.选择命令.连接

C# XML反序列化问题

c# - 如何在xamarin表单中使用SQLite-Net-PCL?

c# - 以有限订阅者同时订阅可观察集合的简单方法

c# - 如果最后一个回调尚未完成,则忽略传入的流更新

c# - Observable.Timer 或 TPL with Task.Delay

c# - Rx 中的热连接

c# - 如何在 LINQ 中应用 if 条件?

c# - 一或零到多,代码优先

c# - 如何将轮询系统变成 Rx.Net IObservable?