asynchronous - 接收 : subscribe with async function and ignore errors

标签 asynchronous system.reactive

我想为可观察对象中的每个项目调用一个异步函数。正如回答here ,解决方案是使用 SelectMany。但是,如果异步方法抛出异常,订阅将终止。我有以下解决方案,它似乎有效:

obs.SelectMany(x => Observable
    .FromAsync(() => RunAsync())
    .Catch(Observable.Empty<string>()));

是否有更惯用的解决方案?

最佳答案

有一种标准方法可以观察 RunAsync 中发生的异常打电话,那是使用 .Materialize() .

.Materialize()方法变成一个 IObservable<T>序列为 IObservable<Notification<T>>您可以对 OnNext 进行推理的序列, OnError , 和 OnCompleted电话。

我写了这个查询:

var obs = Observable.Range(0, 10);

obs
    .SelectMany(x =>
        Observable
            .FromAsync(() => RunAsync())
            .Materialize())
    .Where(x => x.Kind != NotificationKind.OnCompleted)
    .Select(x => x.HasValue ? x.Value : (x.Exception.Message + "!"))
    .Subscribe(x => x.Dump());

有了这个支持代码:

private int counter = 0;
private Random rnd = new Random();

private System.Threading.Tasks.Task<string> RunAsync()
{
    return System.Threading.Tasks.Task.Factory.StartNew(() =>
    {
        System.Threading.Interlocked.Increment(ref counter);
        if (rnd.NextDouble() < 0.3)
        {
            throw new Exception(counter.ToString());
        }
        return counter.ToString();
    });
}

当我运行它时,我得到这样的输出:

2
4
5
1!
6
7
3!
10
8!
9

! 结尾的每一行正在调用 RunAsync这导致了异常。

关于asynchronous - 接收 : subscribe with async function and ignore errors,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29427680/

相关文章:

javascript - 使用 JavaScript 更改外部脚本中包含的 HTML

c# - 等待和异步属性中的方法错误 'Cannot await ' System.Threading.Tasks.Task'

java - 如何使用 Netty 异步发送多个 http 请求?

.net - 如何在 RX 中实现超时缓冲

c# - 我可以使用 Reactive Extensions 来控制仪器测试周期的时间吗?

c# - 在基于 Rx 计数的聚合中,将计数重置为超过最大时间间隔

system.reactive - 为什么 RX 的 BehaviorSubject 中的名称为 "Behavior"?

.net - 立即使用Rx框架按时间间隔启动任务

node.js - 可以在 Node 全局范围内使用await来加载数据库客户端吗?

javascript - Selenium:如何编写同步 Selenium 自动化?