c# - 跨线程编码异常(响应式(Reactive)扩展)

标签 c# exception ienumerable system.reactive

我有一个 IEnumerable 序列,其中包含一些阻塞网络操作(在下面的示例代码中替换为一些简单的 yield )。我正在使用响应式(Reactive)扩展将通过网络传输的数据流转换为可观察的序列。

我正在寻找一种方法将异常编码(marshal)到主线程,以便未处理的异常不会导致我的应用程序终止。我无法将 try/catch block 放置在 IEnumerable 线程上,因为编译器不允许在 try/catch 语句中使用yield return 语句。

using System;
using System.Collections.Generic;
using System.Concurrency;
using System.Linq;
using System.Text;
using System.Threading;

namespace ConsoleApplication7
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                Console.WriteLine("Main thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId);
                var observable = TestEnumerable().ToObservable(Scheduler.NewThread); //Needs to be on a new thread because it contains long-running blocking operations

                // Use subject because we need many subscriptions to a single data source
                var subject = new Subject<int>();

                subject.Subscribe(x => Console.WriteLine("Subscriber1: " + x + " on thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId),
                    x => Console.WriteLine("Subscriber1 ERROR: " + x+ " on thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId),
                    () => Console.WriteLine("Subscriber1 Finished"+ " on thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId));
                subject.Subscribe(x => Console.WriteLine("Subscriber2: " + x + " on thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId),
                    x => Console.WriteLine("Subscriber2 ERROR: " + x+ " on thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId),
                    () => Console.WriteLine("Subscriber2 Finished"+ " on thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId));

                Console.WriteLine("Press key to start receiving data");
                Console.ReadKey();
                var sub = observable.Subscribe(subject);

                Console.WriteLine("Press key to exit");
                Console.ReadKey();
                sub.Dispose();
            }
            catch (Exception ex)
            {
                Console.WriteLine("Caught exception on main thread");
            }

        }

        public static IEnumerable<int> TestEnumerable()
        {
            while (true)
            {
                yield return 1;
                Thread.Sleep(200);
                yield return 2;
                Thread.Sleep(200);
                yield return 3;
                Thread.Sleep(200);
                throw new InvalidOperationException();
            }
        }
    }
}

最佳答案

解决方案取决于您是否有可用的 Dispatcher/SynchronizationContext。在这种情况下使用它当然是更好的选择。

解决方案 1:Dispatcher/SynchronizationContext 可用

(即使用 WPF、Windows 窗体或自定义调度程序循环)

您可以使用ObserveOn + Catch将错误移回调度程序线程。我已经在 WPF 应用程序中看到过它的使用,并且效果很好。

你如何移动你的IScheduler/DispatcherScheduler周围由你决定(我们使用了 IoC)

public static IObservable<T> CatchOn<T>(this IObservable<T> source, 
    IScheduler scheduler)
{
    return source.Catch<T,Exception>(ex => 
        Observable.Throw<T>(ex).ObserveOn(scheduler));
}

// We didn't use it, but this overload could useful if the dispatcher is 
// known at the time of execution, since it's an optimised path
public static IObservable<T> CatchOn<T>(this IObservable<T> source, 
    DispatcherScheduler scheduler)
{
    return source.Catch<T,Exception>(ex => 
        Observable.Throw<T>(ex).ObserveOn(scheduler));
}

解决方案 2:没有可用的调度程序

而不是使用 Console.ReadKey() ,使用ManualResetEvent并等待它,然后抛出可变错误:

        static void Main(string[] args)
        {
            try
            {
                Console.WriteLine("Main thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId);
                var observable = TestEnumerable().ToObservable(Scheduler.NewThread); //Needs to be on a new thread because it contains long-running blocking operations

                // Use subject because we need many subscriptions to a single data source
                var subject = new Subject<int>();

                Exception exception = null;
                ManualResetEvent mre = new ManualResetEvent(false);

                using(subject.Subscribe(
                    x => Console.WriteLine(x),
                    ex => { exception = ex; mre.Set(); },
                    () => Console.WriteLine("Subscriber2 Finished")))

                using(subject.Subscribe(
                    x => Console.WriteLine(x),
                    ex => { exception = ex; mre.Set(); },
                    () => Console.WriteLine("Subscriber2 Finished")))

                using (observable.Subscribe(subject))
                {
                    mre.WaitOne();
                }

                if (exception != null)
                    throw exception;
            }
            catch (Exception ex)
            {
                Console.WriteLine("Caught exception on main thread");
            }

        }

关于c# - 跨线程编码异常(响应式(Reactive)扩展),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5146345/

相关文章:

PHP,异常与常规类有何不同

c# 将字符串转换为 IEnumerable<T>

c# - 如何验证类在使用前是否配置了所有属性?

java - 从二维数组抛出异常后继续

java - 获取 IMethod 捕获的所有异常

c# - 在 C# 中使用迭代器创建 Enumerable 或 Enumerator 之间的区别

c# - 检查两个列表的修改

c# - 无法在c#中打开链接

c# - Asp.net core 为身份创建接口(interface)

C# RegEx 在管道分隔文件中查找空单元格