c# - 为什么从给定的订阅者抛出时从未调用过OnError回调?

标签 c# asynchronous system.reactive observable

请遵守以下单元测试:

using System;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace UnitTests
{
    [TestClass]
    public class TestRx
    {
        public const int UNIT_TEST_TIMEOUT = 5000;

        private static IObservable<int> GetObservable(int count = 100, int msWait = 10)
        {
            return Observable.Create<int>(async (obs, cancellationToken) =>
            {
                for (int i = 0; i < count && !cancellationToken.IsCancellationRequested; ++i)
                {
                    int value = i;
                    obs.OnNext(await Task.Factory.StartNew(() =>
                    {
                        Thread.Sleep(msWait);
                        return value;
                    }));
                }
            });
        }

        [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
        public void Subscribe()
        {
            var tcs = new TaskCompletionSource<object>();
            int i = 0;
            GetObservable().Subscribe(n =>
            {
                Assert.AreEqual(i, n);
                ++i;
            }, e => Assert.Fail(), () =>
            {
                Assert.AreEqual(100, i);
                tcs.TrySetResult(null);
            });

            tcs.Task.Wait();
        }

        [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
        public void SubscribeCancel()
        {
            var tcs = new TaskCompletionSource<object>();
            var cts = new CancellationTokenSource();
            int i = 0;
            GetObservable().Subscribe(n =>
            {
                Assert.AreEqual(i, n);
                ++i;
                if (i == 5)
                {
                    cts.Cancel();
                }
            }, e =>
            {
                Assert.IsTrue(i < 100);
                tcs.TrySetResult(null);
            }, () =>
            {
                Assert.IsTrue(i < 100);
                tcs.TrySetResult(null);
            }, cts.Token);

            tcs.Task.Wait();
        }

        [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
        public void SubscribeThrow()
        {
            var tcs = new TaskCompletionSource<object>();
            int i = 0;
            GetObservable().Subscribe(n =>
            {
                Assert.AreEqual(i, n);
                ++i;
                if (i == 5)
                {
                    throw new Exception("xo-xo");
                }
            }, e =>
            {
                Assert.AreEqual("xo-xo", e.Message);
                tcs.TrySetResult(null);
            }, Assert.Fail);

            tcs.Task.Wait();
        }
    }
}


单元测试SubscribeCancelSubscribeThrow超时,因为从未调用OnError回调,因此对任务的等待永远不会结束。

怎么了?

附言

该问题与How to wrap SqlDataReader with IObservable properly?有关

编辑

同时,我创建了一个新的Rx问题-https://rx.codeplex.com/workitem/74

http://social.msdn.microsoft.com/Forums/en-US/5d0a4808-3ee0-4ff0-ab11-8cd36460cd66/why-is-the-onerror-callback-never-called-when-throwing-from-the-given-subscriber?forum=rx

编辑2

即使遵循Rx Design Guidelines的6.5节-“订阅实现不应抛出”,以下观察者实现也会产生完全相同的结果:

private static IObservable<int> GetObservable(int count = 100, int msWait = 10)
{
    return Observable.Create<int>(async (obs, cancellationToken) =>
    {
        try
        {
            for (int i = 0; i < count && !cancellationToken.IsCancellationRequested; ++i)
            {
                int value = i;
                obs.OnNext(await Task.Factory.StartNew(() =>
                {
                    Thread.Sleep(msWait);
                    return value;
                }));
            }
            obs.OnCompleted();
        }
        catch (Exception exc)
        {
            obs.OnError(exc);
        }
    });
}


编辑3

我开始相信,当将异步可观察序列集成到其他方式的同步代码中时,应该编写这样的代码(通常在一个地方或另一个地方在服务器端就是这种情况):

var tcs = new TaskCompletionSource<object>();
GetObservable().Subscribe(n =>
{
  try
  {
    ...
  }
  catch (Exception e)
  {
    DoErrorLogic();
    tcs.TrySetException(e);
  }
}, e =>
{
  DoErrorLogic();
  tcs.TrySetException(e);
}, () => 
{
  DoCompletedLogic();
  tcs.TrySetResult(null);
});

tcs.Task.Wait();


真的是这样吗?

编辑4

我认为这终于开始使我生疏的大脑陷入混乱。我现在要切换到其他帖子-How to wrap SqlDataReader with IObservable properly?

最佳答案

此行为是设计使然。如果订阅者抛出异常(顺便说一句,这是一种不好的做法),则Rx框架会正确地说明它已死,并不再与之通信。如果取消订阅,这也不是错误-只是请求不发送任何其他类型的事件-Rx很荣幸。

编辑以回应评论

我认为文档中没有简单的参考要指出的-您所看到的行为是内在的,它是隐式的。我能得到的最接近的结果是将您指向AnonymousSafeObserverAutoDetatchObserver的源代码。后者有一个说明性的场景,可能会有所帮助,但其中有些涉及。

也许类比会有所帮助。想象一下,数据流事件是由新闻通讯社发送报纸。订户是家庭。

订阅者引发异常

报社愉快地分发报纸,直到一天,其中一名订户-琼斯先生-放下汽油,他的房屋爆炸而杀死琼斯先生并摧毁了房屋(引发了未经处理的例外)。报社意识到,他不能再将报纸交付给琼斯先生,也不能发送终止通知,报刊供应也没有问题(因此OnError或OnCompleted不合适),报社的订阅人数减少了一个。

将此与报纸印刷商不经意间使用易燃墨水进行对比,并导致工厂起火。现在,糟糕的新闻发布商确实必须向所有无限期停止供应的订户发送解释性说明(OnError)。

订阅者取消订阅

琼斯先生正在接受订阅的报纸,直到有一天他认为自己厌倦了无数令人沮丧的故事,并要求取消订阅。报社有义务。他没有给琼斯先生发任何便条,以说明报纸已停止印刷版本(没有OnCompleted),但没有。他也没有给琼斯先生发送任何说明该报纸已经倒闭的便条(没有OnError)-他只是按照琼斯先生的要求停止发送报纸。

对Edit3的回应

我同情你的斗争。我注意到在您的整个代码中,您一直在尝试将TPL(任务)惯用语与Rx网格化。这样的尝试通常感觉很笨拙,因为它们实际上是完全不同的世界。像这样的段落很难评论:


  
    我开始相信,当将异步可观察序列集成到其他方式的同步代码中时,应该编写这样的代码(通常在一个地方或另一个地方在服务器端就是这种情况):
  


与布兰登精心设计的断言完全一致,我无法想到在您尝试使用服务器端在服务器端集成异步代码和同步代码的真正实例。对我来说,这感觉像是设计的气味。习惯上讲,人们会尝试使代码保持被动状态-进行订阅,并让订阅者以被动方式处理工作。我想不起来有必要以您描述的方式过渡到同步代码。

当然,查看您在Edit3中编写的代码,尚不清楚您要实现的目标。对订户中的错误做出反应不是源的责任。这是尾巴在摇狗。需要在那里以确保订阅者的服务连续性的异常处理程序应该在订阅处理代码中,而不是在可观察的源中-异常处理程序应该只关注免受流氓观察者行为的影响。这种逻辑在上面链接的AnonymousSafeObserver中实现,并且由大多数Rx提供的运算符使用。可观察的对象很可能具有处理其源数据连续性的逻辑-但这是另一个问题,并不是您在代码中要解决的问题。

无论您试图通过调用ToTaskWait桥接到同步代码的任何地方,都有可能需要仔细考虑您的设计。

我认为提供更具体的问题说明(可能是从您要解决的现实情况中得出)可以为您提供更多有用的建议。您说的“ SqlDataReader”示例...


  
    最终,人们可以通过订阅它来直接使用可观察的[包装SqlDataReader],但是由于周围的大多数代码仍然是同步的,因此他们不得不在某个时候等待结束(阻塞线程)。
  


...突出显示了您所处的设计困境。在这种情况下,您推断使用IEnumerable<T>界面-或要求使用IObservable<List<T>>显然会更好。但是关键是要放宽眼界,实际上您试图将SqlDataReader包装在一个可观察的包装器中实际上是一种设计气味-因为这是为响应特定的一次性请求而提供的固定数据。这可能是异步情况,但实际上不是被动情况。与更典型的反应性情况相反,例如“在获得股票X时将价格发送给我”,在这种情况下,您将完全根据订户的源头设置将来的数据流,然后订阅者做出反应。

关于c# - 为什么从给定的订阅者抛出时从未调用过OnError回调?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23862676/

相关文章:

javascript - 为什么 async 会阻止 twitter 回调触发?

ios - 在 UITableView Swift 中滚动时重复图像

c# - RX.Net 中的 Subject 总是有害的吗?

c# - EF 4.3 抛出 "An error occurred while getting provider info from the dB. caused by EF using an incorrect con. string. Check Inner Excptin."

c# - 如何对读取 Excel 阅读器进行单元测试?

java - Spring Repository @Async Future<List<T>> 返回错误类型

c# - RX 自动完成框

c# - 如何使用 Reactive Extensions 来限制客户端请求

c# - FluentAssertions,确保 IEnumerable 仅包含单个元素

c# - ORTools SAT 替代 CS IntVar[].Element(IntExpr index) 以计算互惠分配的成本