c# - 使用 Rx 限制非异步调用的回调

标签 c# c#-4.0 system.reactive

我的问题类似于this问题,但一如既往地略有不同。

我目前正在开发一个项目,该项目使用简单的模式来异步接收底层数据更改的通知。

Foo 类负责订阅这些数据更改,并为类提供一种方法,通过注册实现给定接口(interface)的类的实例来注册它们对这些更改的兴趣:

public Guid SubscribeToChanges(ISomeCallbackInterface callback)

Bar 类实现此回调并注册自身:

public class Bar : ISomeCallbackInterface
{
    ...
    //initialise instance of Foo class and subscribe to updates
    _foo.SubscribeToChanges(this);
    ...

    public void CallbackMethod(string id, IEnumerable<Tuple<string, int, object>> data)
    {
        ...
    }
}

理想情况下,我们希望限制这些回调,因为我们可以获得一个回调,该回调将特定数据 block 的值从 x 更改为 y,然后在一秒的时间内返回到 x。查看 Rx 文档,通过使用 DistinctUntilChanged 运算符,这将是微不足道的。

但是,问题是如何创建一个 IObservable 集合,然后我可以将运算符应用于上面的回调实现。文档非常清楚地说明了如何从标准 .Net 事件或 Begin.../End... 方法对创建 IObservable。

更新:正如 Richard Szalay 在评论中指出的那样,我需要将 DistinctUntilChanged 与 Throttle 结合使用来实现我的需求。

再次感谢理查德,我还应该提到我需要取消订阅回调的能力。在当前模型中,我只需在 Foo 的实例上调用 Unscubscribe(Guid subscriptionToken) 即可。

最佳答案

我想到的唯一方法是使用 ISomeCallbackInterfaceObservable.Create 的自定义实现。尽管如此,这个解决方案还是让人感觉很勉强:

public static IObservable<Tuple<string,IEnumerable<Tuple<string, int, object>>> 
    FromCustomCallbackPattern(ISomeCallbackPublisher publisher)
{
    return Observable.CreateWithDisposable<T>(observer =>
    {
        var subject = new Subject<
            Tuple<string,IEnumerable<Tuple<string, int, object>>>();

        var callback = new ObservableSomeCallback(subject);

        Guid subscription = publisher.SubscribeToChanges(callback);

        return new CompositeDisposable(
            subject.Subscribe(observer),
            Disposable.Create(() => publisher.UnsubscribeFromChanges(subscription))
        );
    });
}

private class ObservableSomeCallback : ISomeCallbackInterface
{
    private IObserver<Tuple<string,IEnumerable<Tuple<string, int, object>>> observer;

    public ObservableSomeCallback(
        IObserver<Tuple<string,IEnumerable<Tuple<string, int, object>>> observer)
    {
        this.observer = observer;
    }

    public void CallbackMethod(string id, IEnumerable<Tuple<string, int, object>> data)
    {
        this.observer.OnNext(new Tuple<
            string,IEnumerable<Tuple<string, int, object>>(id, data));
    }
}

关于c# - 使用 Rx 限制非异步调用的回调,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5953362/

相关文章:

c# - 在 Windows 8.1 运行时应用程序中动态加载 Xaml

c# - 使用 Paypal 进行多购物车结帐

c# - TestRunType 的序列化抛出异常

c# - 多个 react 性可观察序列的ObservableForProperty

c# - 如何取消可观察序列

c# - 如何为泛型创建一个完全懒惰的单例

c# - 一个大数据库查询比许多小数据库查询有什么优势

c# - 如果我们在 C# 中将两个整数相除,如何始终得到一个整数

c# - 如何使用响应式(Reactive)扩展同时读取交错文件

c# - 为什么 C# 不提供用于传递属性作为引用的内部帮助程序?