system.reactive - 使用 Rx 处理可观察序列中的错误

标签 system.reactive reactive-programming

如果发生错误,有没有办法让一个可观察的序列恢复执行序列中的下一个元素?
this post 看来,您需要在 Catch() 中指定一个新的可观察序列来恢复执行,但是如果您只需要继续处理序列中的下一个元素呢?有没有办法实现这一目标?

更新:
场景如下:
我有一堆需要处理的元素。处理由一系列步骤组成。我有
将步骤分解为我想要编写的任务。
我遵循了发布 here 的 ToObservable() 指南
将任务转换为可观察的组合。
所以基本上我正在做这样的事情 -

foreach(element in collection)
{
   var result = from aResult in DoAAsync(element).ToObservable() 
         from bResult in DoBAsync(aResult).ToObservable() 
         from cResult in DoCAsync(bResult).ToObservable() 
         select cResult;
   result.subscribe( register on next and error handlers here)
 }

或者我可以这样:
var result = 
        from element in collection.ToObservable() 
        from aResult in DoAAsync(element).ToObservable() 
         from bResult in DoBAsync(aResult).ToObservable() 
         from cResult in DoCAsync(bResult).ToObservable() 
         select cResult;

即使让我们说处理其他元素,这里继续处理其他元素的最佳方法是什么
其中一个元素引发异常。我希望能够记录错误并在理想情况下继续前进。

最佳答案

James 和 Richard 都提出了一些很好的观点,但我认为他们没有为您提供解决问题的最佳方法。

James 建议使用 .Catch(Observable.Never<Unit>()) .当他说“将......允许流继续”时他是错误的,因为一旦你遇到异常,流必须结束 - 这就是理查德在提到观察者和可观察者之间的契约时所指出的。

另外,使用 Never以这种方式将导致您的 observables 永远不会完成。

简短的回答是 .Catch(Observable.Empty<Unit>())是将序列从以错误结尾的序列更改为以完成结尾的序列的正确方法。

您找到了使用 SelectMany 的正确想法处理源集合的每个值,以便您可以捕获每个异常,但是您会遇到一些问题。

您使用任务 (TPL) 只是为了将函数调用转换为 observable。这会强制您的 observable 使用任务池线程,这意味着 SelectMany语句可能会以不确定的顺序生成值。

您还隐藏了处理数据的实际调用,从而使重构和维护变得更加困难。

我认为您最好创建一个允许跳过异常的扩展方法。这是:

public static IObservable<R> SelectAndSkipOnException<T, R>(
    this IObservable<T> source, Func<T, R> selector)
{
    return
        source
            .Select(t =>
                Observable.Start(() => selector(t)).Catch(Observable.Empty<R>()))
            .Merge();
}

使用此方法,您现在可以简单地执行以下操作:
var result =
    collection.ToObservable()
        .SelectAndSkipOnException(t =>
        {
            var a = DoA(t);
            var b = DoB(a);
            var c = DoC(b);
            return c;
        });

这段代码要简单得多,但它隐藏了异常。如果你想在让你的序列继续的同时坚持异常(exception),那么你需要做一些额外的时髦。向 Materialize 添加几个重载扩展方法可以保留错误。
public static IObservable<Notification<R>> Materialize<T, R>(
    this IObservable<T> source, Func<T, R> selector)
{
    return source.Select(t => Notification.CreateOnNext(t)).Materialize(selector);
}

public static IObservable<Notification<R>> Materialize<T, R>(
    this IObservable<Notification<T>> source, Func<T, R> selector)
{
    Func<Notification<T>, Notification<R>> f = nt =>
    {
        if (nt.Kind == NotificationKind.OnNext)
        {
            try
            {
                return Notification.CreateOnNext<R>(selector(nt.Value));
            }
            catch (Exception ex)
            {
                ex.Data["Value"] = nt.Value;
                ex.Data["Selector"] = selector;
                return Notification.CreateOnError<R>(ex);
            }
        }
        else
        {
            if (nt.Kind == NotificationKind.OnError)
            {
                return Notification.CreateOnError<R>(nt.Exception);
            }
            else
            {
                return Notification.CreateOnCompleted<R>();
            }
        }
    };
    return source.Select(nt => f(nt));
}

这些方法允许你这样写:
var result =
    collection
        .ToObservable()
        .Materialize(t =>
        {
            var a = DoA(t);
            var b = DoB(a);
            var c = DoC(b);
            return c;
        })
        .Do(nt =>
        {
            if (nt.Kind == NotificationKind.OnError)
            {
                /* Process the error in `nt.Exception` */
            }
        })
        .Where(nt => nt.Kind != NotificationKind.OnError)
        .Dematerialize();

你甚至可以链接这些 Materialize方法和使用 ex.Data["Value"] & ex.Data["Selector"]获取抛出错误的值和选择器函数。

我希望这会有所帮助。

关于system.reactive - 使用 Rx 处理可观察序列中的错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6052788/

相关文章:

c# - 如何在 Rx.NET 中以正确的方式约束并发

c# - 可观察到的 : ANDing sources and completion

java - 使用多线程 RxJava 的响应式(Reactive)拉取

java - Reactor 取消订阅的方式

java - Flux 未在 Spring 5 react 器中订阅

haskell - 推荐阅读/教程来了解reactive-banana FRP库

c# - 响应式扩展测试调度程序模拟时间流逝

c# - 如何在IObservable管道中保留异常并在最后将其重新抛出?

c# - 使用 Reactive Extensions 动态连接序列

java Process.waitfor 是一个阻塞调用