c# - 如何获取 IObservable<IObservable<T>> 的最新变化事件?

标签 c# .net system.reactive reactive-programming rx.net

我的系统有很多状态对象——连接状态、CPU 负载、登录用户等等。所有此类事件都合并到单个可观察流中。

我想制作一个管理实用程序来显示系统的实际状态并显示所有这些计数器。

我如何创建一个包含所有计数器的最后更改值列表的可观察对象?

这是我想要的弹珠图:

s1 (cpu):               -s1_v1----s1_v1---s1_v2
s2 (users count):       --s2_v1--s2_v1---------s2_v2
s3 (some cat purr/sec)  ----s3_v1----s3_v1----s3_v1

flatten sequence: s1_v1-s2_v1-s3_v1-s2_v1-s1_v1-s3_v1-s1_v2-s3_v1-s2_v2

期望的输出:

s1_v1|s1_v1|s1_v1|s1_v2|s1_v2
      s2_v1|s2_v1|s2_v1|s2_v2
            s3_v1|s3_v1|s3_v1

到目前为止我可以实现这个:

public class StatusImplementation
{
    public static IObservable<IDictionary<TKey, TValue>> Status<TKey, TValue>(
        params IObservable<KeyValuePair<TKey, TValue>>[] observables)
    {
        var uniqueObservables = observables
            .Select(x => x.Publish().RefCount().DistinctUntilChanged());

        return Observable.Create<IDictionary<TKey, TValue>>(o =>
        {
            var compositeDisposable = new CompositeDisposable();
            var dictionary = new Dictionary<TKey, TValue>();

            foreach (var uniqueObservable in uniqueObservables)
            {
                var disposable = uniqueObservable.Subscribe(x =>
                {
                    if (dictionary.ContainsKey(x.Key) && !dictionary[x.Key].Equals(x.Value))
                    {
                        var newDictionary = new Dictionary<TKey, TValue>(dictionary);
                        newDictionary[x.Key] = x.Value;
                        dictionary = newDictionary;
                    }
                    else
                    {
                        dictionary.Add(x.Key, x.Value);
                    }

                    o.OnNext(dictionary);
                });
                compositeDisposable.Add(disposable);
            }

            return compositeDisposable;
        });
    }
}

这是一个用法示例:

        var f1 = Observable.Interval(TimeSpan.FromMilliseconds(1000))
            .Select(x => new KeyValuePair<string, long>("event 1", x));
        var f2 = Observable.Interval(TimeSpan.FromMilliseconds(1200))
            .Select(x => new KeyValuePair<string, long>("event 2", x));
        var f3 = Observable.Interval(TimeSpan.FromMilliseconds(1250))
            .Select(x => new KeyValuePair<string, long>("event 3", x));

        var combined = f1.Merge(f2).Merge(f3);

        StatusImplementation.Status(f1, f2, f3)
            .Select(x => string.Join(", ", x.ToList()))
            .Dump("\tstatus");

        combined.Dump("normal");

还有 Dump 函数(来自 Lee Campbell 的 great 书):

    public static void Dump<T>(this IObservable<T> source, string name)
    {
        source.Subscribe(
            i => Console.WriteLine("{0}-->{1}", name, i),
            ex => Console.WriteLine("{0} failed-->{1}", name, ex.Message),
            () => Console.WriteLine("{0} completed", name));
    }

所以问题是:有没有更好的方法来实现这个功能?可能不在 observable 中使用 Dictionary?

谢谢。

最佳答案

因此,如果您从组合 observable 开始——它可以从任意数量的源 observables 中生成——那么您可以这样做:

var query =
    combined
        .Scan(
            new Dictionary<string, long>() as IDictionary<string, long>,
            (d, kvp) =>
            {
                var d2 = new Dictionary<string, long>(d) as IDictionary<string, long>;
                d2[kvp.Key] = kvp.Value;
                return d2;
            });

这将为 combined observable 产生的每个值返回一系列字典对象。每个字典对象将是一个不同的实例 - 如果返回相同的实例,您将不断更改值,这可能会导致线程问题。

关于c# - 如何获取 IObservable<IObservable<T>> 的最新变化事件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30400898/

相关文章:

c# - MigraDoc 页脚位置

c# - Visual Studio 2015 空白应用程序(通用 Windows): CS0731

c# - Entity Framework 5 - 派生类的基于枚举的鉴别器

c# - 通过IQueryable批量枚举

system.reactive - 要求对 Reactive Extensions (RX) 进行清晰、如画的解释?

c# - 从 C# 运行 Powershell

ios - 我应该为 ios 客户端启用 cors 吗?

.net - 测试上下文为空

.net - 在 .NET 中优化 System.Drawing 的 PNG 输出

c# - react 性 : Trying to understand how Subject<T> work