目前,我正在使用 RX 框架来实现类似工作流的消息处理管道。本质上,我有一个消息生成器(反序列化网络消息并在主题上调用 OnNext())并且我有几个消费者。
注意:If 和 transform 是我编写的扩展方法,它们只返回一个 IObservable。
消费者做了类似下面的事情:
var commerceRequest = messages.Transform(x => GetSomethingFromDatabase(x)
.Where(y => y.Value > 5)
.Select(y => y.ComplexObject)
.If(z => z.IsPaid, respond(z))
.Do(z => SendError(z));
commerceRequest
然后被另一个类似的管道使用,并且一直持续到顶部,在有人调用最终管道上的 Subscribe()
时结束。我遇到的问题是来自 base 的消息不会向上传播,除非在某处直接对消息调用订阅。
如何将消息推送到堆栈顶部?我知道这是一种非正统的方法,但我觉得它使代码非常容易理解消息发生的情况。如果您觉得这是一个非常糟糕的想法,谁能建议另一种方法来做同样的事情?
最佳答案
如果没有订阅者,他们为什么要通过管道?如果您的中间步骤之一对它们的副作用有用(即使没有其他订阅者,您也希望它们运行),您应该重写副作用操作以成为订阅者。
如果您想继续链条,您也可以将带有副作用的步骤作为传递操作(或 tee,如果您愿意的话)。
关于c# - 作为管道的 RX IObservable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/2243324/