reactive-programming - Concat 运算符语义,但可以立即订阅所有不可靠的可观察对象

标签 reactive-programming rx-java

我想连接一个冷的和一个热的可观察量。也就是说,resulting observable 应该首先发出冷 observable 的结果,然后是热 observable 的结果。同时,我希望订阅第二个 observable,即 hot,在订阅第一个 observable 的同时发生,否则我会错过一个重要事件。

这看起来与 merge 所做的非常相似。但我想保证热 observable 在冷 observable 完成之前不会推送任何东西,merge 不能保证。解决这个问题的正确方法是什么?

最佳答案

根据您的需要使用ReplayPublishLast 运算符。每个都有一个接受选择器函数的重载。

例如:

var coldThenHot = hot.PublishLast(cold.Concat);

订阅coldThenHot 会导致PublishLast 首先调用选择器,创建Concat 查询。然后它订阅它和你的 hot observable。 hot observable 中的最后一个值被缓冲。当 cold observable 完成时,序列继续使用缓冲值,或者只是保持静默直到最后一个值到达。

但是,我很好奇您所说的到底是什么意思。如果您的 hot observable 在您订阅之前不会生成值,那么 technically it's cold .如果您的可观察对象确实热门,那么在创建此查询时您可能已经错过了该值。虽然,它可能已经被隐式缓冲(例如,如果它是由 Observable.FromAsyncPattern 创建的),在这种情况下,只需像正常一样连接序列。

var coldThenHot = cold.Concat(hot);

关于reactive-programming - Concat 运算符语义,但可以立即订阅所有不可靠的可观察对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24607060/

相关文章:

java - RxJava 中的 concatMap 和 flatMap 有什么区别

.net - 证明其有用性的.NET Reactive Framework示例

java - SubscibeOn 对主题没有影响

java - RxJava - 获取列表中的每一项

meteor - Meteor js中Deps和Tracker的区别

rx-java - 使用RxJava构建异步REST API

java - RxJava - 合并两个 Observable

android - RxJava android 链接请求

system.reactive - 使用异步模式 (queue.BeginReceive,queue.EndReceive) 为 MSMQ 消息接收使用响应式(Reactive)扩展 (Rx)

javascript - RxJS:延迟可观察项的生成