c# - RX2.0 : ObjectDisposedException after diposing EventLoopScheduler

标签 c# system.reactive

我们最近将系统从RX 1.11111移植到RX 2.0,并发现了此问题。我们为ObserveOn使用EventLoopScheduler,如下所示:

IDisposable subscription = someSubject
    .ObserveOn(m_eventLoopScheduler)
    .SomeMoreRXFunctions()
    .Subscribe((something)=>something)

调度程序位于应用程序导出(m_eventLoopScheduler.Dispose)上。在此之前,我们将所有对observable(subscription.Dispose)的订阅都处理掉。

尽管如此,我们还是在ObjectDisposedException中得到了EventLoopScheduler.Schedule。捕获该异常是不可能的,因为它起源于RX线程。这几乎就像Dispose并不能消除某个队列中的所有项。

我们试图删除对EventLoopScheduler.Dispose的调用,但异常消失了。但是,尽管处置了所有订阅,但SomeMoreRXFunctions()中的代码又执行了约10次。

还有其他方法可以正确关闭EventLoopScheduler吗?

最佳答案

关于订阅的一些观察

(很抱歉,无法抗拒双关语!)几乎每个Rx运算符实现的接口(interface)IObservable<out T>只有一种重要的方法:

IDisposable Subscribe(IObserver<T> observer);

纯粹通过此方法及其对返回值的处理,观察者(实现IObserver<T>)可以确定预订的开始和结束时间。

当对作为链的一部分的Observable进行订阅时,通常(直接或间接)进行订阅,这将导致订阅在链的更上层进行。准确地确定是否以及何时发生这种情况取决于给定的Observable。

在许多情况下,收到的订阅与进行的订阅之间的关系不是一对一的。一个示例是Publish(),无论其接收到多少个订阅,它最多只能对其源进行一个订阅。这实际上就是发布的重点。

在其他情况下,该关系具有时间方面。例如,Concat()在第一个流具有OnCompleted()之前将不会订阅第二个流,这可能永远不会!

值得花点时间检查Rx Design Guidelines,因为他们有一些非常相关的事情要说:

接收设计指南

4.4. Assume a best effort to stop all outstanding work on Unsubscribe. When unsubscribe is called on an observable subscription, the observable sequence will make a best effort attempt to stop all outstanding work. This means that any queued work that has not been started will not start.

Any work that is already in progress might still complete as it is not always safe to abort work that is in progress. Results from this work will not be signalled to any previously subscribed observer instances.



底线

注意这里的含义;最重要的是,当可能进行或处置任何上游订阅时,这完全取决于Observable的实现。换句话说,绝对不能保证:处置订阅将导致Observable处置其直接或间接进行的任何或所有订阅。这适用于运营商或其上游订阅者使用的任何其他资源(例如计划的操作)。

您所能期望的最好的是,每个上游运营商的作者确实尽了最大的努力来停止所有出色的工作。

回到问题(最后!)

没有看到SomeMoreRXFunctions的内容,我无法确定,但是您看到的异常很有可能是由您引起的,这是因为-尽管处置了您所知道的订阅,但由于处置了调度程序,您已经从下面剥离了地毯仍在运行订阅的脚。实际上,您是在导致以下情况:
void Main()
{
    var scheduler = new EventLoopScheduler();

    // Decide it's time to stop
    scheduler.Dispose();

    // The next line will throw an ObjectDisposedException
    scheduler.Schedule(() => {});
}

编写一个可能导致此问题的完全合理的运算符很容易-甚至是不直接使用调度程序的运算符!考虑一下:
public static class ObservableExtensions
{
    public static IObservable<TSource> ReasonableDelay<TSource, TDelay>
        (this IObservable<TSource> source, IObservable<TDelay> delay)
    {
        return Observable.Create<TSource>(observer =>
        {        
            var subscription = new SerialDisposable();
            subscription.Disposable = delay
                .IgnoreElements()
                .Subscribe(_ => {}, () => {
                    Console.WriteLine("Waiting to subscribe to source");
                    // Artifical sleep to create a problem
                    Thread.Sleep(TimeSpan.FromSeconds(2));
                    Console.WriteLine("Subscribing to source");
                    // Is this line safe?
                    subscription.Disposable = source.Subscribe(observer);
                }); 
            return subscription;
        });
    }    
}

一旦传递的可观察到的延迟完成,此运算符(operator)将订阅源。看看它有多合理-它使用SerialDisposable正确地将两个潜在的,在时间上分开的订阅作为单个一次性文件呈现给它的观察者。

但是,颠覆该运算符并使其引起异常是很简单的:
void Main()
{
    var scheduler = new EventLoopScheduler();
    var rx = Observable.Range(0, 10, scheduler)
                       .ReasonableDelay(Observable.Timer(TimeSpan.FromSeconds(1)));
    var subs = rx.Subscribe();

    Thread.Sleep(TimeSpan.FromSeconds(2));
    subs.Dispose();
    scheduler.Dispose();    
}

这里发生了什么事?我们正在EventLoopScheduler上创建Range,但是将ReasonableDelay附加到使用默认调度程序通过Timer创建的延迟流。

现在,我们进行订阅,等待直到延迟流完成,然后按照“正确的顺序”处理我们的订阅和EventLoopScheduler。

我在Thread.Sleep中插入的人为延迟确保了很容易自然发生的竞争状况-延迟已完成,预订已被取消,但为时已晚,无法阻止Range运算符访问已处置的EventLoopScheduler。

延迟部分完成后,我们甚至可以加强合理的努力来检查观察者是否已取消订阅:
// In the ReasonableDelay method
.Subscribe(_ => {}, () => {        
    if(!subscription.IsDisposed) // Check for unsubscribe
    {
        Console.WriteLine("Waiting to subscribe to source");
        // Artifical sleep to create a problem            
        Thread.Sleep(TimeSpan.FromSeconds(2));
        Console.WriteLine("Subscribing to source");
        // Is this line safe?                    
        subscription.Disposable = source.Subscribe(observer);
    }
}); 

这不会有帮助。也没有办法仅在此运算符的上下文中使用锁定语义。

你做错了什么

您无需处理该EventLoopScheduler!一旦将其传递给其他Rx运营商,就已经将其责任转移给了它。 Rx运算符(operator)应遵循准则,以尽可能及时的方式清理其订阅-这将意味着直接或间接取消EventLoopScheduler上的任何暂挂的计划项目,并停止任何进一步的计划,以便其队列尽早清空。可能的。

在上面的示例中,您可以将问题归因于使用多个调度程序的某些人为设计以及在ReasonableDelay中强制执行Sleep -但是,不难想象出运算符(operator)无法立即清理的真实情况。

本质上,通过部署Rx调度程序,您正在执行与线程中止相同的Rx。就像在这种情况下一样,您可能需要处理一些异常(exception)情况!

正确的做法是拆开神秘的SomeMoreRXFunctions(),并确保它们在合理可能的范围内尽可能遵守准则。

关于c# - RX2.0 : ObjectDisposedException after diposing EventLoopScheduler,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13109054/

相关文章:

c# - 寻找用于即时消息传递的库,例如 libpurple,但用 C# 编写

c# - 工作流基础 4.5 "Expression Activity type ' CSharpValue` 1' requires compilation in order to run."

c# - 来自 Reactive 扩展的 PCL WeakEventManager 在 3 - 7 分钟内处理事件

c# - 具有节流定时器重生的 Rx GroupByUntil

c# - CombineLatest 是否保留了可观察量的顺序?

c# - 无法使用证书存储中的客户端证书,只能通过从文件加载

c# - Azure 是否有开箱即用的 key 分布式锁定方式?

c# - ObserveOn with Scheduler.NewThread 不这么观察,如果observer的OnNext被阻塞继续

c# - 接收 : Ignoring updates caused by Subscribers

c# - DataGridView、BindingSource、DataMember - 我不明白