c# - 2 个 IObservable 上的响应式(Reactive) linq 表达式

标签 c# .net asynchronous system.reactive

在下面的代码中,如果我正确理解 RX 中的连接,我应该会看到出现以下警报:

  • 西
  • 测试
  • 测试西*
  • 完成

我收到了预期的 4 个警报中的 3 个...为什么我没有收到“Test-West”?

public partial class MainWindow : Window
{
    public MainWindow()
    {
        InitializeComponent(); 

        var loginInitial = new LoginInitial();
        var loginCheckList = new LoginCheckList();



        var result1 = from x in loginInitial.Status
                    from y in loginCheckList.Status
                    where x == "Test" && y == "West"
                    select new { x, y };

        result1.Subscribe(x => MessageBox.Show(x.x + "-" + x.y));

        var result2 = from x in loginInitial.Status
                      where x == "Test"
                      select x;

        result2.Subscribe(x => MessageBox.Show(x));

        var result3 = from x in loginCheckList.Status
                      where x == "West"
                      select x;

        result3.Subscribe(x => MessageBox.Show(x));

        var task1 = Task.Factory.StartNew(() =>
                                  {
                                      for (int i = 0; i < 10000000; i++)
                                      {
                                          if (i == 9000000)
                                              loginInitial.Status.Publish("9000000");
                                          if (i == 9000001)
                                              loginInitial.Status.Publish("Test");
                                      }
                                  });

        var task2 = Task.Factory.StartNew(() =>
                                  {
                                        for (int i = 0; i < 1000000; i++)
                                        {
                                            if (i == 800000)
                                                loginInitial.Status.Publish("800000");
                                            if (i == 800001)
                                                loginCheckList.Status.Publish("West");
                                        }
                                  });
        Task.WaitAll(task1, task2);

        MessageBox.Show("Done");
    }
}

public class LoginInitial
{
    public PublishObservable<string> Status = new PublishObservable<string>(); 
}

public class LoginCheckList
{
    public PublishObservable<string> Status = new PublishObservable<string>();
}

public class PublishObservable<T> : IObservable<T>
{
    private IList<IObserver<T>> _observers = new List<IObserver<T>>();

    public void Publish(T value)
    {
        lock (_observers)
        {
            foreach (var observer in _observers)
            {
                observer.OnNext(value);
            }
        }
    }

    public void Complete()
    {
        lock (_observers)
        {
            foreach (var observer in _observers)
            {
                observer.OnCompleted();
            }
        }
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        lock (_observers)
        {
            _observers.Add(observer);
        }
        return null;
    }
}

最佳答案

当您在 Rx 中使用 from 子句时,您是说该子句的其余部分应该针对所有出现的可观察值运行。对于嵌套的 from 子句,这意味着您正在等待第一个事件的第一次出现,然后开始针对该事件运行该子句的其余部分(然后对所有 future 的事件并行执行相同的操作)发生)。您可以找到有关 SelectMany 如何工作的更多信息,例如 here .

当您查看示例时:

var result1 = 
  from x in loginInitial.Status
  from y in loginCheckList.Status
  where x == "Test" && y == "West"
  select new { x, y }; 

...这意味着该子句需要等待loginInitial.Status。当这触发一个值时,它开始等待 loginCheckList.Status。如果我正确理解您的代码,Initial observable 将在 CheckList observable 之后生成一个值,因此当您开始等待第二个值时一、该值已经生成,您将无法再次获取。

我认为在您的情况下更合适的操作是 Observable.ZipCombineLatest (请参阅 thisthis )。

关于c# - 2 个 IObservable 上的响应式(Reactive) linq 表达式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5162308/

相关文章:

c# - 如何从给定日期获取下个月的结束日期

c# - 扩展和过滤 C# 数组

c# - 如果抛出异常,如何从存储过程中检索输出值?

javascript - 如何使 for 循环等待异步函数

c# - 在 Visual Studio 中隐藏区域

c# - 检查字符串的第一个元素是否为正整数

c# - 在 C# 中直接从文件加载字体

c# - 使用 FromAssembly 注册 CaSTLe Windsor 需要一些第 3 方引用的 dll,但不是全部

unit-testing - Dart 单元测试应该在异常时失败

c# - 异步读取输出时外部进程阻塞主线程