c# - 如何关联函数输入和输出?

标签 c# system.reactive

考虑下面的简单程序。它有一个可观察的整数和一个函数来计算最近发布的整数是偶数还是奇数。出乎意料的是,程序会在报告数字发生变化之前报告最近的数字是否为偶数/奇数。

static void Main(string[] args) {
    int version = 0;
    var numbers = new Subject<int>();
    IObservable<bool> isNumberEven = numbers.Select(i => i % 2 == 0);
    isNumberEven
        .Select(i => new { IsEven = i, Version = Interlocked.Increment(ref version) })
        .Subscribe(i => Console.WriteLine($"Time {i.Version} : {i.IsEven}"));
    numbers
        .Select(i => new { Number = i, Version = Interlocked.Increment(ref version) })
        .Subscribe(i => Console.WriteLine($"Time {i.Version} : {i.Number}"));
    numbers.OnNext(1);
    numbers.OnNext(2);
    numbers.OnNext(3);
    Console.ReadLine();
}

输出是:

Time 1 : False
Time 2 : 1
Time 3 : True
Time 4 : 2
Time 5 : False
Time 6 : 3

我认为更改数字会引发一连串的下游影响,并且这些影响会按照发生的顺序进行报告。交换订阅顺序将交换报告结果的方式。我知道 rx 是异步的,事情有可能以不确定的顺序发生。如果我在我的函数中使用 .Delay() 或网络调用,我无法确定何时会报告结果。但在这种情况下,我感到非常惊讶。

为什么这很重要?我认为这意味着如果我想尝试关联函数的输入和输出(比如打印发布的数字以及它们是偶数还是奇数),我必须在输出结果中包含输入参数,如下所示:

var isNumberEven = numbers.Select(i => new {
    Number = i,
    IsEven = i % 2 == 0
});

我想我可以构建一堆简单的小函数,然后使用 rx 运算符组合它们来完成复杂的计算。但也许我不能使用 rx 运算符来组合/加入/关联结果。在定义每个函数时,我必须自己关联输入和输出。

在某些情况下,我可以使用 rx 运算符关联结果。如果每个输入都产生一个输出,我可以压缩这两个。但是一旦您执行了 Throttle input 之类的操作,它就不再起作用了。

这个版本的程序似乎确实以合理的方式报告数字是偶数还是奇数。

static void Main(string[] args) {
    var numbers = new Subject<int>();
    var isNumberEven = numbers.Select(i => i % 2 == 0);
    var publishedNumbers = numbers.Publish().RefCount();
    var report =
        publishedNumbers
        .GroupJoin(
            isNumberEven,
            (_) => publishedNumbers,
            (_) => Observable.Empty<bool>(),
            (n, e) => new { Number = n, IsEven = e })
        .SelectMany(i => i.IsEven.Select(j => new { Number = i.Number, IsEven = j }));
    report.Subscribe(i => Console.WriteLine($"{i.Number} {(i.IsEven ? "even" : "odd")}"));
    numbers.OnNext(1);
    numbers.OnNext(2);
    numbers.OnNext(3);
    Console.ReadLine();
}

输出如下:

1 odd
2 even
3 odd

但我不知道这是一个幸运的巧合还是我是否可以依靠它。 Rx 中的哪些操作以确定的顺序发生?哪些是不可预测的?我是否应该定义所有函数以在结果中包含输入参数?

最佳答案

您的第一个程序的行为完全符合我的预期,而且是确定性的。

I understand that rx is asynchronous and it is possible for things to happen in non-deterministic order.

如果您引入非确定性行为(如并发/调度),事情只会以非确定性顺序发生,否则 Rx 是确定性的。

这里有几个问题/误解。 1) 可变外部状态 - version 2) 主题的使用(但在本示例中根本不是问题) 3) 对回调如何发出的误解。

让我们只关注 3)。如果我们将您的代码解包到其基本调用站点,您可能会发现 Rx 在幕后是多么简单。

numbers.OnNext(1); 主题将查找它的订阅和 OnNext 每个订阅的顺序。

IObservable<bool> isNumberEven = numbers.Select(i => i % 2 == 0);
isNumberEven
    .Select(i => new { IsEven = i, Version = Interlocked.Increment(ref version) })
    .Subscribe(i => Console.WriteLine($"Time {i.Version} : {i.IsEven}"));

也可以简化为

numbers.Select(i => i % 2 == 0)
    .Select(i => new { IsEven = i, Version = Interlocked.Increment(ref version) })
    .Subscribe(i => Console.WriteLine($"Time {i.Version} : {i.IsEven}"));

有人可能会争辩说,由于 isNumberEven 从未在其他任何地方使用过,您应该将其缩减为这个。

所以我们可以看到我们有第一个订阅者。 实际上,它将运行的代码是这样的

private void HandleOnNext(int i)
{
    var isEven = i % 2 == 0
    var temp = new { IsEven = isEven , Version = Interlocked.Increment(ref version) };
    Console.WriteLine($"Time {temp .Version} : {temp .IsEven}");
}

我们的第二个订阅者(因为 .Subscribe( 方法是在偶数订阅之后调用的),是 numbers 订阅者。 他的代码可以有效地归结为

private void HandleOnNext(int i)
{
    var temp = new { Number = i, Version = Interlocked.Increment(ref version) };
    Console.WriteLine($"Time {temp.Version} : {temp.Number}");
}

所以一旦你完全解构了代码,你基本上会得到这个

void Main()
{
    int version = 0;

    //numbers.OnNext(1);
    ProcessEven(1, ref version);
    ProcessNumber(1, ref version);
    //numbers.OnNext(2);
    ProcessEven(2, ref version);
    ProcessNumber(2, ref version);
    //numbers.OnNext(3);
    ProcessEven(3, ref version);
    ProcessNumber(3, ref version);
}

// Define other methods and classes here
private void ProcessEven(int i, ref int version)
{
    var isEven = i % 2 == 0;
    var temp = new { IsEven = isEven, Version = Interlocked.Increment(ref version) };
    Console.WriteLine($"Time {temp.Version} : {temp.IsEven}");
}
private void ProcessNumber(int i, ref int version)
{
    var temp = new { Number = i, Version = Interlocked.Increment(ref version) };
    Console.WriteLine($"Time {temp.Version} : {temp.Number}");
}

一旦所有的回调和订阅都被具体化了,你就会发现这并不是什么神奇的事情,一切都是确定性的。

Should I be defining all my functions to include the input parameters in the results?

要回答您的问题(考虑到您对 Rx 的误解,我不愿这样做),您只需要在结果序列的顺序不确定时这样做。 这方面的一个例子可能是您一次发出多个网络请求。 您不能确定他们都会按照您发送给他们的顺序进行回复。 但是,您可以强制这些场景与 Concat

等运算符的使用保持一致

关于c# - 如何关联函数输入和输出?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38364746/

相关文章:

c# - 在 C# 中使用可选参数作为对象

c# - Linq Group 结果到对象

c# - 是否可以为 System.Windows.Forms.Combobox-Control 设置小于 21 像素的高度

c# - 在 ASP.net 的浏览器中显示来自 Oracle 数据库的 PDF

c# - 没有为实体类型 MyImage 找到合适的构造函数

.net - 如何将动态的 Observable 集合组合成一个新的 Observable?

design-patterns - 使用 RX 通过中介在两个组件之间进行通信

c#-4.0 - 响应式(Reactive)扩展处理事件一次

c# - Observable.Using( ) 取消

c# - .PublishLast()(以前称为 Prune)的用例