c# - 响应式扩展 SelectMany 和 Concat

标签 c# system.reactive reactive-programming rxjs

我理解 SelectMany 的行为是有效地将产生的每个值的结果合并到一个流中,因此排序是不确定的。

我如何在 C# 中的 RxJs 中执行类似于 concatAll 的操作。

var obs = Observable.Range (1, 10).SelectMany (x => {
return Observable.Interval (TimeSpan.FromSeconds(10 - x)).Take (3);
}).Concat();

这实际上是我想要做的,给定一个范围,等待每个范围,然后按照它们开始的顺序进行连接。显然这是一个玩具示例,但想法就在那里。

布莱尔

最佳答案

使用 Select , 不是 SelectMany . Concat您要使用的重载适用于 IObservable<IObservable<T>> , 所以简单地投影内部序列,不要展平它们。

var obs = Observable.Range(1, 10)
                    .Select(x => Observable.Interval(TimeSpan.FromSeconds(10 - x)).Take(3))
                    .Concat();

请注意每个 Interval 的订阅使用 Concat 延迟;即第一个 Interval当您订阅时立即开始,但所有剩余的间隔都是在没有订阅的情况下生成和排队的。这不像Concat将订阅所有内容,然后稍后以正确的顺序重播这些值。

关于c# - 响应式扩展 SelectMany 和 Concat,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26300072/

相关文章:

c# - 调用 OnPaint 时 Infragistics UltraGrid 上的 "Index was outside the bounds of the array"

c# - 在应用程序中存储密码

c# - Cold observable 的 Scheduler.CurrentThread - 它将在哪个线程中运行?

javascript - 条件链可观察

android - RxJava - 如何在另一个流等待第一个项目时缓冲流中的所有项目?

c# - UWP/.NET Native 和 iOS 上的 Protobuf-net

c# - Observable.FromEvent 和 CreateDelegate 参数映射

c# - 在 Reactive Extensions 中如何将项目缓冲到组中?

java - 如何使用 Reactor 的 StepVerifier 来验证 Mono 是否为空?

javascript - 未捕获的 TypeError : undefined is not a function when ASP. NET 客户端访问服务