c# - 当 C# Reactive Framework 中源流和目标流相同时,SelectMany 如何工作?

标签 c# system.reactive

我有两个 Subject,一个是具有 ID 的人员对象流,另一个是代表谁与谁成为好友的 ID 的外部引用流。这是一些简化的代码。

class Program {
    static void Main() {
        // Set up observables
        var people = new Subject<Person>();
        var friendMap = new Subject<FriendMap>();
        var friendNotices = from p1 in people
                            from p2 in people
                            from pair in friendMap
                            where p1.Id == pair.Id1 && p2.Id == pair.Id2
                            select p1.Name + " befriended " + p2.Name;

        // Subscribe to log
        friendNotices.Subscribe(Console.WriteLine);

        // Add people
        people.OnNext(new Person(1, "Alice"));
        people.OnNext(new Person(2, "Bob"));

        // Add relationships
        friendMap.OnNext(new FriendMap(1, 2)); // => "Alice befriended Bob"
        friendMap.OnNext(new FriendMap(2, 1)); // Doesn't show up!
    }
}

class Person {
    public Person(int id, string name) {
        Id = id;
        Name = name;
    }
    public string Name;
    public int Id;
}

class FriendMap {
    public FriendMap(int id1, int id2) {
        Id1 = id1;
        Id2 = id2;
    }
    public int Id1;
    public int Id2;
}

我遇到的问题是,有时添加外部参照不会导致 friendNotice 事件。特别是,如果具有 Id2 的人员是在具有 Id1 的人员之前创建的,则似乎会失败。

这是 Rx 中的错误还是我的代码中的错误?不管怎样,我该如何让它发挥作用?

(“交友”在我的应用程序中是不可交换的 - Alice 与 Bob 交友与 Bob 与 Alice 交友是不同的关系,因此“只需交换 ID 并重试”在我的情况下并不是一个可用的解决方案)。

最佳答案

这里的问题是对 SelectMany 函数如何工作的误解(这是 linq 理解 from ... from ... 映射到的运算符)。

此构造从源流中获取每个元素并将其投影到目标流上。它通过在目标流上为源的每个元素创建订阅来为投影提供服务来实现此目的。

让我们使用一些伪代码来检查这一点。仅考虑查询的这一部分:

from p1 in people
from p2 in people

目前,我们将人员流分为 peopleA 和 peopleB:

from p1 in peopleA
from p2 in peopleB

如果我们现在调用:

peopleA.OnNext(Alice);

实际发生的情况是,将在 peopleB 上创建一个新订阅,以服务 Alice 到 peopleB 的投影。此时,peopleB 中没有任何元素 - 因此不会发生投影。

现在如果我们调用:

peopleB.OnNext(Tom);

Alice -> peopleB 的投影将运行并输出 (Alice, Tom)。

现在调用:

peopleB.OnNext(Dick);

现在 Alice -> peopleB 的投影继续,因此 (Alice, Dick) 将被输出。

现在调用:

peopleA.OnNext(Bob);

现在,Bob 在 peopleB 上开始了新的订阅 - 但在 peopleB 发出之前不会输出任何内容。

现在调用:

peopleB.OnNext(Harry);

随着 Alice 和 Bob 订阅的运行,我们将得到 (Alice, Harry) 和 (Bob, Harry);

一个简单的例子供您尝试:

var source = new Subject<string>();
var source2 = new Subject<string>();

var res = from s in source
          from t in source2
          select s + " " + t;

res.Subscribe(Console.WriteLine);

source.OnNext("A"); 
source2.OnNext("1");
source2.OnNext("2");
source.OnNext("B");
source2.OnNext("3");

将给出输出:

A 1
A 2
A 3
B 3

回到“self SelectMany”。现在一切开始变得有点棘手。关键部分是正在设置的订阅不会捕获触发其设置的“当前”项目。因此,让我们标记 SelectMany A 和 B 的每个部分:

from p1 in people (call this A)
from p2 in people (call this B)

当我们调用时:

people.OnNext(Alice);

A 上的 Alice 订阅将在 B 上进行 - 但由于它是在 Alice 发出后进行的,因此不会捕获她,也不会输出任何内容。

现在我们调用:

people.OnNext(Bob);

Alice 的订阅将在 B 上看到 Bob,这会导致输出 (Alice, Bob)。将在 B 上创建 A 上 Bob 的订阅,但同样不会输出任何内容,因为它错过了 Bob 的输出。

这就是你所看到的。发出的唯一组合是(Alice,Bob)。

您可以通过在设置新订阅时让人们重播其内容来解决此问题。像这样修改示例的第一部分:

    // Set up observables
    var people = new Subject<Person>();
    var peopleR = people.Replay().RefCount();
    var friendMap = new Subject<FriendMap>();
    var friendNotices = from p1 in peopleR
                        from p2 in peopleR
                        from pair in friendMap
                        where p1.Id == pair.Id1 && p2.Id == pair.Id2
                        select p1.Name + " befriended " + p2.Name;

...但是如果您的流长时间运行,这当然是不可取的,因为您将所有内容缓存在内存中。

就你的具体问题而言,我认为你最好采取不同的方式 - 但如果不知道你想要实现什么目标,就很难做出规定,而且这已经是一篇很长的文章了!一种方法是简单订阅 FriendMap 条目,然后查找 friend 的姓名来输出您想要的消息。 (我可能错过了大局中的一些东西!)。

希望您能理解您的方法存在的问题。

关于c# - 当 C# Reactive Framework 中源流和目标流相同时,SelectMany 如何工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16978592/

相关文章:

c# - DownloadString 跳过换行符

c# - 报告框架背后的基本原理

c# - LINQ to SQL - 确定我是否有过时数据

c# - 将计时器添加到 Windows 窗体应用程序

c# - 获取包含可观察属性的对象列表的单个可观察值?

c# - 接收 : Difference between CurrentThreadScheduler and ImmediateScheduler

c# - 如何对包含文件大小数据的 ListView 列进行排序? C#

scala - 相当于 Scala 中的 LINQ to Events

java - RXJava 结合多个订阅

c# - 围绕一组不断变化的依赖可观察量创建可观察量