c# - 节流 IObservable 导致观察者未被调用

标签 c# system.reactive

我有这个简单的代码,它:

  • 创建 IObservable
  • 采样半秒
  • 使用 ThreadPool 调度程序订阅它
  • 使用 SynchronizationContext 观察它

代码如下:

private void DisplayPoints()
{
    var x = 0;
    var ob = this.GeneratePoints();
    ob
      .Sample(TimeSpan.FromMilliseconds(500))
      .SubscribeOn(ThreadPoolScheduler.Instance)
      .ObserveOn(SynchronizationContext.Current)
      .Subscribe(d => Console.WriteLine(d));
}

private IObservable<double> GeneratePoints()
{
    return Observable.Create<double>(o => this.GeneratePoints(o));
}

private IDisposable GeneratePoints(IObserver<double> observer)
{
    var i = 0;
    while (true)
    {
        var value = random.Next(0, 100) * (1 / (double)random.Next(1, Math.Min(50, Math.Max(i, 1))));

        observer.OnNext(value);

        i++;
    }

    return Disposable.Empty;
}

但是,控制台上不会输出任何内容(即永远不会调用匿名观察者)。如果我删除 Sample 运算符,则会调用观察者,尽管这种行为显然不是预期的(UI 线程将受到轰炸)。

我显然在这里遗漏了一些东西。我的目的是生成数据,通过 IObserver 推送它,并通过 UI 显示其中的一些数据。

编辑:由于有些人误解了我的意图(即使上面已经明确说明),我应该重申,我正在尝试做的事情:

  • 使用算法生成一些数据(double 值似乎足以解决我的问题)
  • 在 GUI 中显示数据

使用 IObservable 和 Reactive Extensions 似乎是解决我的问题的好方法。

重复一遍:我不会在真实代码中返回随机数 - 这只是一个占位符 让我的预期行为发挥作用。

最佳答案

您可能不想在紧密循环中生成随机数。最好使用一个时间间隔。下面每 200 毫秒生成一次随机数。

IObservable<double> observable =
     Observable.Interval(TimeSpan.FromMillSeconds(200))
          .Select((t,i) => random.Next(0, 100) 
                      * (1 / (double)random.Next(1, Math.Min(50, Math.Max(i, 1)))))

Enigmativity 为您编写的代码实际上也是紧密循环。他关于您在订阅过程中推出值(value)的错误的观点也是正确的。您必须对代码进行最少的更改才能使其正常工作。

    private static Task GeneratePoints(IObserver<double> observer, CancellationToken token)
    {
        return Task.Run(() =>
        {
            var i = 0;
            var random = new Random();
            while ( true )
            {
                token.ThrowIfCancellationRequested();

                var value = random.Next(0, 100) * ( 1 / ( double ) random.Next(1, Math.Min(50, Math.Max(i, 1))) );

                observer.OnNext(value);

                i++;
            }
        });
    }

一段时间后

    Observable.Create<double>((observer, token) => GeneratePoints(observer, token));

注意正在传递的取消标记。当序列的订阅者取消订阅时,将设置此 token 并且循环将终止。

然而,这是很多工作,而且 Enigmativities 的答案更简单,并且为您抽象了上面的代码。对于更复杂的情况,了解如何手动执行此操作仍然很有用。

关于c# - 节流 IObservable 导致观察者未被调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22056023/

相关文章:

c# - 使用Rx Start, Retry, Delay, 等待同步文件删除重试

c# - 在 Rx 中,为什么 When 给我过时的元素

c# - 如何从 MVC Controller 打开新窗口?

c# - Ninject:如何根据目标程序集绑定(bind)接口(interface)

c# - 切换 linq 语法

c# - IQbservable 示例

windows-phone-8 - 接收 2.1 : How to subscribe and observe on Dispatcher correctly

c# - Naudio float Sample 和 Concentus.OggFile short Sample,将 mp3 转换为 opus ogg

C# - 在 block 末尾自动释放内存

.net - 可以将同一驱动器上的文件操作批处理到同一线程的线程调度程序?