c# - 如何使用 1000 个观察者高效运行 Observable.Where()?

标签 c# performance system.reactive

我是 Rx 新手,并且非常喜欢它。我发现我们代码的现有部分可以通过使用它来大大简化。我遇到了性能问题,如果有专家可以帮助我,我将不胜感激。

以前,我有一个手动实现的观察者类,它接受订阅以及关键过滤器。当事件进入类时,它将使用提供的键来查找哪些观察者需要回调。这是代码的非常简化的版本:

 class OldClass
 {
    private Dictionary<string, List<Action<UsefulInfo>> _callbacks = 
        new Dictionary<string, List<Action<UsefulInfo>>();

    void Subscribe(string key, Action<UsefulInfo> callback)
    {
        _callbacks[key].Add(callback);
    }

    // Some event happens that we want to notify subscribers about
    void EventHandler(object sender, SomeEventArgs e)
    {
        // Create object for callbacks
        UsefulInfo info = CreateUsefulInfo(e);

        string key = info.Key;

        // Look up callbacks for key
        _callbacks[key].ForEach(callback => callback(info));
    }
 }

我已将其更新为使用 Rx,如下所示:

class NewClass
{
    private Subject<UsefulInfo> _subject = new Subject<UsefulInfo>();
    private IObservable<UsefulInfo> _observable;

    public NewClass()
    {
        _observable = _subject.ToObservable();
    }

    IDisposable Subscribe(string key, Action<UsefulInfo> callback)
    {
        return _observable.Where(x => x.Key == key).Subscribe(callback);
    }

    // Some event happens that we want to notify subscribers about
    void EventHandler(object sender, SomeEventArgs e)
    {
        UsefulInfo info = CreateUsefulInfo(e);

        _observable.OnNext(info);
    }
 }

旧代码执行 O(1) 字典键查找来查找回调,但新 Rx 代码调用Where Func O(n) 次。我有数千名观察者。

有没有一种方法可以给 Rx 一个返回一个键的 Func,然后它可以在内部使用该键将观察者存储在字典中?还有其他方法可以提高性能吗?或者我是否以非预期的方式使用该框架?

最佳答案

您可以使用Publish来共享观察者在 Where'd 序列上,然后使用 RefCountIConnectableObservable 上更智能地管理对源的订阅。

在您的场景中,我想您会将这些“已发布”的可观察值存储在字典中并按需添加到其中。

这是一个基本的实现:

class NewClass 
{ 
    private Subject<UsefulInfo> _subject = new Subject<UsefulInfo>(); 
    private IDictionary<string, IObservable<UsefulInfo>> _keyedObservables; 

    public NewClass() 
    { 
        _keyedObservables = new Dictionary<string, IObservable<UsefulInfo>>();
    } 

    IDisposable Subscribe(string key, Action<UsefulInfo> callback) 
    { 
        // NOT threadsafe for concurrent subscriptions!
        if (!_keyedObservables.Contains(key))
        {
            var keyedAndPublished = _subject.Where(x => x.Key == key)
                .Publish()
                .RefCount();

            _keyedObservables.Add(key, keyedAndPublished);
        }

        return _keyedObservables[key].Subscribe(callback);
    } 

    // Some event happens that we want to notify subscribers about 
    void EventHandler(object sender, SomeEventArgs e) 
    { 
        UsefulInfo info = CreateUsefulInfo(e); 

        _observable.OnNext(info); 
    } 
} 

关于c# - 如何使用 1000 个观察者高效运行 Observable.Where()?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11149579/

相关文章:

c# - 隐藏工具栏绑定(bind)错误

c# - 尝试将 Autofac 作为嵌入式程序集加载时出现 FileNotFoundException

javascript - 组件和 cms 驱动应用程序的 AngularJS 最佳实践

c# - 具有 in 修饰符的事件参数的 Observable.FromEvent 会导致异常

system.reactive - 如何采取第一次发生,然后抑制事件 2 秒(RxJS)

c# - Travis-CI 构建失败 "The type or namespace name ' 扩展在命名空间中不存在”

c# - 使用 Open XML SDK 读取上传的 pptx 文件

php - MySQL DATETIME 函数美与性能(速度)

performance - 为什么haskell中两个版本的归并排序有1000倍的性能差异

c# - 如何在异步场景中测试 Rx observable?