c# - 响应式扩展 - 属性相互更新

标签 c# asynchronous system.reactive

我有 2 个 DecimalUpDown 控件,num_one 和 num_two,分别绑定(bind)到属性 First 和 Second。当 First 改变时,它会联系服务器计算 Second 的值,反之亦然。异步触发服务器调用释放了 UI,但在快速触发(例如滚轮)时,最后一个请求并不总是最后一个返回,因此值可能会变得不同步。

使用 Reactive 我试图限制调用以仅在用户停止更改一段时间后才触发服务器调用。问题是,当您在更新期间进行更改时,属性更改开始相互触发并根据 Throttle 的 TimeSpan 来回卡住。

    public MainWindow()
    {
        InitializeComponent();

        DataContext = this;

        Observable.FromEventPattern<RoutedPropertyChangedEventHandler<object>, RoutedPropertyChangedEventArgs<object>>(h => num_one.ValueChanged += h, h => num_one.ValueChanged -= h)
            .Throttle(TimeSpan.FromMilliseconds(100), Scheduler.ThreadPool)
           .Subscribe(x =>
           {
               Thread.Sleep(300); // simulate work
               Second = (decimal)x.EventArgs.NewValue / 3.0m;
           });

        Observable.FromEventPattern<RoutedPropertyChangedEventHandler<object>, RoutedPropertyChangedEventArgs<object>>(h => num_two.ValueChanged += h, h => num_two.ValueChanged -= h)
            .Throttle(TimeSpan.FromMilliseconds(100), Scheduler.ThreadPool)
           .Subscribe(x =>
           {
               Thread.Sleep(300); // simulate work
               First = (decimal)x.EventArgs.NewValue * 3.0m;
           });
    }

    private decimal first;
    public decimal First
    {
        get { return first; }
        set
        {
            first = value;
            NotifyPropertyChanged("First");
        }
    }

    private decimal second;
    public decimal Second
    {
        get { return second; }
        set
        {
            second = value;
            NotifyPropertyChanged("Second");
        }
    }

最佳答案

有一个内置的 Rx 运算符可以帮助您在不使用 Throttle 的情况下完全按照您的意愿行事。和超时 - 这是 Switch运营商。

Switch运算符不适用于 IObservable<T>所以大多数时候你永远不会在智能感知中看到它。

相反,它在 IObservable<IObservable<T>> 上运行- 可观察数据流 - 并将源扁平化为 IObservable<T>通过不断切换到最新的 observable 产生(并忽略之前 observables 的任何值)。它仅在外部 Observable 完成而不是内部 Observable 完成时完成。

这正是您想要的 - 如果发生新的值更改,则忽略任何先前的结果并仅返回最新的结果。

这是如何做到的。

首先,我将令人讨厌的事件处理代码移除到几个可观察对象中。

var ones =
    Observable
        .FromEventPattern<
            RoutedPropertyChangedEventHandler<object>,
            RoutedPropertyChangedEventArgs<object>>(
            h => num_one.ValueChanged += h,
            h => num_one.ValueChanged -= h)
        .Select(ep => (decimal)ep.EventArgs.NewValue);

var twos =
    Observable
        .FromEventPattern<
            RoutedPropertyChangedEventHandler<object>,
            RoutedPropertyChangedEventArgs<object>>(
            h => num_two.ValueChanged += h,
            h => num_two.ValueChanged -= h)
        .Select(ep => (decimal)ep.EventArgs.NewValue);

您的代码似乎有点困惑。我假设 DecimalUpDown 的值控件是返回结果的服务器函数的输入。下面是将调用服务器的函数。

Func<decimal, IObservable<decimal>> one2two = x =>
    Observable.Start(() =>
    {
        Thread.Sleep(300); // simulate work
        return x / 3.0m;
    });

Func<decimal, IObservable<decimal>> two2one = x =>
    Observable.Start(() =>
    {
        Thread.Sleep(300); // simulate work
        return x * 3.0m;
    });

显然,您在这两个函数中放入了实际的服务器代码调用。

现在连接最终的可观察对象和订阅几乎是微不足道的。

ones
    .DistinctUntilChanged()
    .Select(x => one2two(x))
    .Switch()
    .Subscribe(x =>
    {
        Second = x;
    });

twos
    .DistinctUntilChanged()
    .Select(x => two2one(x))
    .Switch()
    .Subscribe(x =>
    {
        First = x;
    });

DistinctUntilChanged确保我们只在值实际发生变化时才进行调用。

然后很容易调用这两个服务器函数,执行Switch并仅返回最新的结果,然后将其分配给该属性。

你可能需要在这里或那里弹出一个调度程序和一个 ObserveOn将订阅转移到 UI 线程,否则此解决方案应该可以很好地工作。

关于c# - 响应式扩展 - 属性相互更新,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11247013/

相关文章:

c# - 并行性,几千个线程

c# - 如何覆盖 express js 默认的 Keep Alive 行为

asynchronous - 为什么在以下程序中在同步之前调用异步功能?

c# - 这段代码对于使用 Rx-Core 的 UdpClient 是否正确

c# - IEnumerable 到具有间隔的 Observable

c# - 用于 silverlight 4 + WCF 数据服务的 IAsyncRepository 或 IObservableRepository

c# - 原始流有数据,Deflate 返回零字节

c# - TagHelper ModelExpression 元数据中不存在描述属性文本

.net - 我在哪里可以找到 ThreadPool.SwitchTo 方法?

javascript - NodeJS。异步。平行线。相同的功能