c# - 作为管道的 RX IObservable

标签 c# .net workflow system.reactive pipeline

目前,我正在使用 RX 框架来实现类似工作流的消息处理管道。本质上,我有一个消息生成器(反序列化网络消息并在主题上调用 OnNext())并且我有几个消费者。

注意:If 和 transform 是我编写的扩展方法,它们只返回一个 IObservable。

消费者做了类似下面的事情:

 var commerceRequest = messages.Transform(x => GetSomethingFromDatabase(x)
                              .Where(y => y.Value > 5)
                              .Select(y => y.ComplexObject)
                              .If(z => z.IsPaid, respond(z))
                              .Do(z => SendError(z));

commerceRequest 然后被另一个类似的管道使用,并且一直持续到顶部,在有人调用最终管道上的 Subscribe() 时结束。我遇到的问题是来自 base 的消息不会向上传播,除非在某处直接对消息调用订阅。

如何将消息推送到堆栈顶部?我知道这是一种非正统的方法,但我觉得它使代码非常容易理解消息发生的情况。如果您觉得这是一个非常糟糕的想法,谁能建议另一种方法来做同样的事情?

最佳答案

如果没有订阅者,他们为什么要通过管道?如果您的中间步骤之一对它们的副作用有用(即使没有其他订阅者,您也希望它们运行),您应该重写副作用操作以成为订阅者。

如果您想继续链条,您也可以将带有副作用的步骤作为传递操作(或 tee,如果您愿意的话)。

关于c# - 作为管道的 RX IObservable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/2243324/

相关文章:

c# - 实时磁贴更新的最佳选择

c# - _ 在函数和变量名中是什么意思?

.net - 在两个 .net 应用程序之间共享 "session state"类型数据的最佳方式

java - 工作流程:如何使用序列代替 foreach?

python - 在 Python 中处理一个简单的工作流

c# - 从剪贴板中获取字符串?没有组装引用吗?

c# - 添加“阅读更多”按钮以查看 Gridview 描述文本

.net - 如何以编程方式关闭 VB.NET 中的 ComboBox 下拉菜单?

c# - 有没有办法阻止这段代码每次都打开一个新的浏览器窗口?

svn - 使用 Subversion 为 2-3 人的 Web 开发设置完美的工作流程