我有一个 IEnumerable 序列,其中包含一些阻塞网络操作(在下面的示例代码中替换为一些简单的 yield )。我正在使用响应式(Reactive)扩展将通过网络传输的数据流转换为可观察的序列。
我正在寻找一种方法将异常编码(marshal)到主线程,以便未处理的异常不会导致我的应用程序终止。我无法将 try/catch block 放置在 IEnumerable 线程上,因为编译器不允许在 try/catch 语句中使用yield return 语句。
using System;
using System.Collections.Generic;
using System.Concurrency;
using System.Linq;
using System.Text;
using System.Threading;
namespace ConsoleApplication7
{
class Program
{
static void Main(string[] args)
{
try
{
Console.WriteLine("Main thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId);
var observable = TestEnumerable().ToObservable(Scheduler.NewThread); //Needs to be on a new thread because it contains long-running blocking operations
// Use subject because we need many subscriptions to a single data source
var subject = new Subject<int>();
subject.Subscribe(x => Console.WriteLine("Subscriber1: " + x + " on thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId),
x => Console.WriteLine("Subscriber1 ERROR: " + x+ " on thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId),
() => Console.WriteLine("Subscriber1 Finished"+ " on thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId));
subject.Subscribe(x => Console.WriteLine("Subscriber2: " + x + " on thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId),
x => Console.WriteLine("Subscriber2 ERROR: " + x+ " on thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId),
() => Console.WriteLine("Subscriber2 Finished"+ " on thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId));
Console.WriteLine("Press key to start receiving data");
Console.ReadKey();
var sub = observable.Subscribe(subject);
Console.WriteLine("Press key to exit");
Console.ReadKey();
sub.Dispose();
}
catch (Exception ex)
{
Console.WriteLine("Caught exception on main thread");
}
}
public static IEnumerable<int> TestEnumerable()
{
while (true)
{
yield return 1;
Thread.Sleep(200);
yield return 2;
Thread.Sleep(200);
yield return 3;
Thread.Sleep(200);
throw new InvalidOperationException();
}
}
}
}
最佳答案
解决方案取决于您是否有可用的 Dispatcher/SynchronizationContext。在这种情况下使用它当然是更好的选择。
解决方案 1:Dispatcher/SynchronizationContext 可用
(即使用 WPF、Windows 窗体或自定义调度程序循环)
您可以使用ObserveOn
+ Catch
将错误移回调度程序线程。我已经在 WPF 应用程序中看到过它的使用,并且效果很好。
你如何移动你的IScheduler
/DispatcherScheduler
周围由你决定(我们使用了 IoC)
public static IObservable<T> CatchOn<T>(this IObservable<T> source,
IScheduler scheduler)
{
return source.Catch<T,Exception>(ex =>
Observable.Throw<T>(ex).ObserveOn(scheduler));
}
// We didn't use it, but this overload could useful if the dispatcher is
// known at the time of execution, since it's an optimised path
public static IObservable<T> CatchOn<T>(this IObservable<T> source,
DispatcherScheduler scheduler)
{
return source.Catch<T,Exception>(ex =>
Observable.Throw<T>(ex).ObserveOn(scheduler));
}
解决方案 2:没有可用的调度程序
而不是使用 Console.ReadKey()
,使用ManualResetEvent
并等待它,然后抛出可变错误:
static void Main(string[] args)
{
try
{
Console.WriteLine("Main thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId);
var observable = TestEnumerable().ToObservable(Scheduler.NewThread); //Needs to be on a new thread because it contains long-running blocking operations
// Use subject because we need many subscriptions to a single data source
var subject = new Subject<int>();
Exception exception = null;
ManualResetEvent mre = new ManualResetEvent(false);
using(subject.Subscribe(
x => Console.WriteLine(x),
ex => { exception = ex; mre.Set(); },
() => Console.WriteLine("Subscriber2 Finished")))
using(subject.Subscribe(
x => Console.WriteLine(x),
ex => { exception = ex; mre.Set(); },
() => Console.WriteLine("Subscriber2 Finished")))
using (observable.Subscribe(subject))
{
mre.WaitOne();
}
if (exception != null)
throw exception;
}
catch (Exception ex)
{
Console.WriteLine("Caught exception on main thread");
}
}
关于c# - 跨线程编码异常(响应式(Reactive)扩展),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5146345/