c# - RX - Zip 输出意外结果

标签 c# reactive-programming

请帮我理解一个现象:

为什么 X 不等于 Observable 项中的索引?

例如构建 block :

        public class EcgSample
        {               
            public EcgSample(int y)
            {
                Y = y;   
            } 

            public int X { get; set; }
            public int Y { get; set; }  
        }

        private void Print(Tuple<EcgSample, int> s)
        {
              Debug.WriteLine("X : {0} , Y : {1} , Index : {2}", s.Item1.X, s.Item1.Y, s.Item2);
        }

        private List<EcgSample> CreateSamples()
        {
            var testSamples = new List<EcgSample>();

            for (short i = 0; i < 1400; i++)
            {
               testSamples.Add(new EcgSample(i));   
            }

            return testSamples;
        }

可观察示例:(输出预期结果)

       // (1) Create From Collection .
       IObservable<EcgSample> sampleObservable = CreateSamples().ToObservable(new EventLoopScheduler());

       // (2) Repeat 
       IObservable<EcgSample> repeated = sampleObservable.Repeat();

       // (3) Indexed 
       IObservable<Tuple<EcgSample,int>> indexed = repeated.Select((item, index) =>
       {
           item.X = index;
           return new Tuple<EcgSample, int>(item, index);
       }); 

       // (4) Buffered 
       IObservable<IList<Tuple<EcgSample, int>>> buffered = indexed.Buffer(250); 

       // (5) SelectMany and Print .
       _disposable = buffered.SelectMany(buf => buf).Subscribe(Print);

OUTPUT:这是 Observable 序列的预期输出。

       [8384] X : 0 , Y : 0 , Index : 0 
       [8384] X : 1 , Y : 1 , Index : 1 
       [8384] X : 2 , Y : 2 , Index : 2 
       [8384] X : 3 , Y : 3 , Index : 3 
       [8384] X : 4 , Y : 4 , Index : 4 

修改:(不输出非预期结果)

现在我不想在每个时间间隔都使用每个缓冲区:

     // (5) Create an Observable from a Timer. 
     IObservable<ElapsedEventArgs> timerObservable = Observable.Create<ElapsedEventArgs>(
            observer =>
            {
                var timer = new Timer();
                timer.Interval = 250;
                timer.Elapsed += (s, e) => observer.OnNext(e);
                timer.Start();
                return Disposable.Create(() =>
                {
                    timer.Stop();
                });
            });

        // (6) Zip with the buffer observable 
        IObservable<IList<Tuple<EcgSample, int>>> zipped = timerObservable.Zip(buffered, (t, b) => b);

        // (7) SelectMany and Print .
        _disposable = zipped.SelectMany(buf => buf).Subscribe(Print);

输出:这输出了一个意想不到的结果:注意 X 不等于索引。

   [9708] X : 187600 , Y : 0 , Index : 0 
   [9708] X : 187601 , Y : 1 , Index : 1 
   [9708] X : 187602 , Y : 2 , Index : 2 
   [9708] X : 187603 , Y : 3 , Index : 3 

知道为什么 X 从 187600 开始(不用说每次我运行我的程序时这个值都不一样)..?

编辑:

我通过在最后简单地投影解决了这个问题,但我仍然想知道为什么会出现第一个问题。

        List<EcgSample> list = CreateSamples();     

        var loop = new EventLoopScheduler();
        var sampleObservable = list.ToObservable(loop);

        IObservable<EcgSample> reapeted = sampleObservable.Repeat();

        IObservable<IList<EcgSample>> buffered = reapeted.Buffer(250);

        IObservable<ElapsedEventArgs> timerObservable = Observable.Create<ElapsedEventArgs>(
            observer =>
            {
                var timer = new Timer();
                timer.Interval = 250;
                timer.Elapsed += (s, e) => observer.OnNext(e);
                timer.Start();
                return Disposable.Create(() =>
                {
                    timer.Stop();
                });
            });

        IObservable<IList<EcgSample>> zipped = timerObservable.Zip(buffered, (t, b) => b);

        _disposable = zipped.SelectMany(buf => buf).Select((item, index) =>
        {
            item.X = index;
            return new Tuple<EcgSample, int>(item, index);

        }).Subscribe(Print);

最佳答案

您的答案显示了您可以更改的一件事以获得您想要的行为,但这并不是它没有按您预期的方式工作的真正原因。

如果你想将 Observable 中的每个条目与一个数字相关联,你实际上应该将它与一个数字相关联。你这样做的方式,流中的每个元素和数字之间没有实际的联系。您的修复只是确保您在下一个项目通过之前处理每个项目,因此该数字恰好处于正确的值。但这是一个非常不稳定的情况。

如果您只想知道您在直播中进行的项目的运行计数,请查看 the overload of Select that gives you the index :

stream.Select((item, index) => new { item, index })
      .Subscribe(data => Debug.WriteLine("Item at index {0} is {1}", data.index, data.item))

或者,如果您想要的不仅仅是流中的项目计数,您可以这样做:

stream.Select(item => new { item, index = <some value you calculate> })
...

这样你的对象和它的索引就联系在一起了。您可以在未来的任何时候使用该项目的索引,并且仍然知道它的索引是什么。而您的代码依赖于在处理下一个项目之前获取每个项目。

解决问题中的修改

首先,看看Observable.Interval .它可以完成您尝试使用计时器执行的操作,但更容易。

其次,请看下面的示例,它再现了您在问题中所做的事情。运行此代码会产生正确的输出:

var items = Enumerable.Range(65, 26)
                      .Select(i => (char)i)
                      .Repeat();

var observableItems = items.ToObservable()
                           .Select((c, i) => new { Char = c, Index = i });

var interval = Observable.Interval(TimeSpan.FromSeconds(0.25));

var buffered = observableItems.Buffer(10);
var zipped = buffered.Zip(interval, (buffer, _) => buffer);

zipped.SelectMany(buffer => buffer).Dump();

您可以在 LinqPad 中运行该代码,这是探索 Rx(以及 .Net 的其他部分)的非常有用的工具。

最后 - 我假设这是一个简化的练习,试图找出您的情况。看起来您可能正在尝试处理推送比您想要处理的更多更新的传感器数据。将 Zip 与间隔一起使用不会有太大帮助。您会减慢数据到达的速度,但它只会建立越来越大的等待通过 Zip 的数据队列。

如果你想每250毫秒获取一个数据点,看Sample .如果您想一次获得 250 毫秒的读数,请查看 overload of Buffer that takes a timespan instead of a count .

关于c# - RX - Zip 输出意外结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29755137/

相关文章:

c# - 如何实现 MaxOrDefault(x => x.SomeInt) LINQ 扩展?

c# - 如何在 ASP.NET Core 中下载文件?

c# - 克服共享对象线程问题的最佳解决方案

javascript - 使用 RxJS 合并未知数量的可观察对象

rx-java - RxJava flatMap 运算符的不明确行为

c# - 将数据导出到 CSV MVC4

c# - 如何使用 TestScheduler 完成超时行为?

java - Spring WebFlux Reactor - 更新 Flux 中的对象

java - RxJava 过期订阅

swift - 在 Swift 框架中公开响应式(Reactive)可观察量