c# - 如何使用响应式扩展实现 'handle remaining' 观察者

标签 c# .net system.reactive

我有一个 IObservable<string>以及一些根据某些条件处理字符串的观察者:

observable.Subscribe(s => { if (s.StartsWith("a")) {...} });
observable.Subscribe(s => { if (s.StartsWith("b")) {...} });
observable.Subscribe(s => { if (s.StartsWith("c")) {...} });
observable.Subscribe(s => { if (s.StartsWith("d")) {...} });
....

这是一个简化的示例(条件更复杂并且观察到的事件不是字符串)但您明白了。

我想要一个 IObserver<string>捕获所有未被任何其他观察者处理的字符串。可以随时添加具有不同条件(即:StartsWith("e"))的观察者,并且条件集不会重叠。

是否以某种方式支持这种情况?或者我是否必须将观察到的字符串标记为已处理并在所有其他观察者都尝试过后订阅未处理的字符串(以及我如何实现)?

最佳答案

我有两种方法。

第一个提供了一种将谓词/ Action 对链接在一起以“虹吸”匹配的值的方法。它遵循 Rx 运算符风格。

我可以这样写:

observable
    .Syphon(s => s.StartsWith("a"), s => { })
    .Syphon(s => s.StartsWith("b"), s => { })
    .Syphon(s => s.StartsWith("c"), s => { })
    .Syphon(s => s.StartsWith("d"), s => { })
    .Subscribe(s => { /* otherwise */ });

如果我有这个扩展方法:

public static IObservable<T> Syphon<T>(
    this IObservable<T> source,
    Func<T, bool> predicate,
    Action<T> action)
{
    if (source == null) throw new ArgumentNullException("source");
    if (predicate == null) throw new ArgumentNullException("predicate");
    if (action == null) throw new ArgumentNullException("action");
    return Observable.Create<T>(o =>
        source.Subscribe(
            t =>
            {
                if (predicate(t))
                {
                    action(t);
                }
                else
                {
                    o.OnNext(t);
                }
            },
            ex =>
                o.OnError(ex),
            () =>
                o.OnCompleted()));
}

它不允许您即时添加和删除谓词/ Action 对,但它是一个相当简单的运算符,可能会有用。

为了拥有完整的添加/删除功能,我想出了这种方法:

Func<Func<string, bool>, Action<string>, IDisposable> add;

observable
    .Syphon(out add)
    .Subscribe(s => { /* otherwise */ });

var startsWithA = add(s => s.StartsWith("a"), s => { /* a */ });
var startsWithB = add(s => s.StartsWith("b"), s => { /* b */ });
startsWithA.Dispose();
var startsWithC = add(s => s.StartsWith("c"), s => { /* c */ });
var startsWithD = add(s => s.StartsWith("d"), s => { /* d */ });
startsWithC.Dispose();
startsWithB.Dispose();
startsWithD.Dispose();

.Syphon(out add)扩展方法重载允许该方法有效地返回两个结果 - 正常返回值为 IObservable<T>第二个是 Func<Func<T, bool>, Action<T>, IDisposable> .第二个返回值允许将新的谓词/ Action 对添加到虹吸运算符,然后通过调用 Dispose 将其删除。在返回的订阅上 - 非常 Rx-ish。

扩展方法如下:

public static IObservable<T> Syphon<T>(
    this IObservable<T> source,
    out Func<Func<T, bool>, Action<T>, IDisposable> subscriber)
{
    if (source == null) throw new ArgumentNullException("source");

    var pas = new List<Tuple<Func<T, bool>, Action<T>>>();

    subscriber = (p, a) =>
    {
        lock (pas)
        {
            var tuple = Tuple.Create(p, a);
            pas.Add(tuple);
            return Disposable.Create(() =>
            {
                lock (pas)
                {
                    pas.Remove(tuple);
                }
            });
        }
    };

    return Observable.Create<T>(o =>
        source.Subscribe(
            t =>
            {
                Action<T> a = null;
                lock (pas)
                {
                    var pa = pas.FirstOrDefault(x => x.Item1(t));
                    if (pa != null)
                    {
                        a = pa.Item2;
                    }
                }
                if (a != null)
                {
                    a(t);
                }
                else
                {
                    o.OnNext(t);
                }
            },
            ex =>
                o.OnError(ex),
            () =>
                o.OnCompleted()));
}

我用这个测试了代码:

var xs = Observable.Interval(TimeSpan.FromSeconds(0.2));

Func<Func<long, bool>, Action<long>, IDisposable> subscriber;
xs
    .Syphon(out subscriber)
    .Subscribe(x => Console.WriteLine(x));

var divBy3 = subscriber(
    x => x % 3 == 0,
    x => Console.WriteLine("divBy3"));

Thread.Sleep(2000);

var divBy2 = subscriber(
    x => x % 2 == 0,
    x => Console.WriteLine("divBy2"));

Thread.Sleep(2000);
divBy3.Dispose();
Thread.Sleep(2000);
divBy2.Dispose();
Thread.Sleep(10000);

它产生了:

divBy3
1
2
divBy3
4
5
divBy3
7
8
divBy3
divBy2
11
divBy3
13
divBy2
divBy3
divBy2
17
divBy3
19
divBy2
21
divBy2
23
divBy2
25
divBy2
27
divBy2
29
30
31
32
...

这似乎是正确的。如果这能为您解决问题,请告诉我。

关于c# - 如何使用响应式扩展实现 'handle remaining' 观察者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7793162/

相关文章:

.net - 用户界面允许 700 多种选择

c# - Action<Action> 是什么意思?

c# - CombineLatest 是否保留了可观察量的顺序?

c# - Unity 通过脚本设置 Player Settings Target Architectures

c# - 如何让IdentityServer将用户身份添加到访问 token 中?

c# - 如何创建可以处理或转换未知类型的 lambda 表达式?

c# - 生成具有动态可变时间间隔的事件

c# - ListBox ItemTemplate 中的 Togglebutton 中的 Textblock 中的文本换行

c# - 如果请求的实体在 Db 中不存在,我应该抛出哪个异常?

.net - IronRuby 是否正在积极开发中?