c# - 如何在 Reactive Extensions 中同时选择 Head 和 Tail

标签 c# system.reactive race-condition

我想创建以下组合器

public static IObservable<U> HeadTailSelect<T, U>
    (this IObservable<T> source, Func<T, IObservable<T>, U> fn)
{

}

选择器方法应该传递当前事件和一个可观察到所有 future 事件的尾部。 必须保证,在未来任何时候订阅尾部时,第一个接收到的事件将是在头部之后接收到的下一个事件。

我知道这需要一些缓冲,但我不太确定如何将它们放在一起。

这有一些不错的属性。你可以做

IObservable<IObservable<Unit>> windows =
source
    .HeadTailSelect((h,tail)=>Observable
        .Interval(TimeSpan.FromSeconds(1))
        .TakeUntil(tail)
        .Select(_=>Unit.Default)
    )

并避免竞争条件,即在窗口内需要 TakeUntil 才能在响应第一个事件后注册您错过了一些事件。

还有关于如何测试实现的任何想法的bonus karma

下面的测试用例对于实现来说是必要的,尽管它可能不足以证明是否避免了竞争条件。

public class HeadTailSelect : ReactiveTest
{
    TestScheduler _Scheduler = new TestScheduler();

    [Fact]
    public void ShouldWork()
    {

        var o = _Scheduler.CreateColdObservable
            (OnNext(10, "A")
            , OnNext(11, "B")
            , OnNext(12, "C")
            , OnNext(13, "D")
            , OnNext(14, "E")
            , OnNext(15, "F")
            , OnCompleted<string>(700)
            );

        var data = o.HeadTailSelect((head, tail) => tail.Take(2).ToList())
            .SelectMany(p=>p)
            .Select(l=>String.Join("-", l));


        var actual = _Scheduler.Start(() =>
            data
        , created: 0
        , subscribed: 1
        , disposed: 1000
        );

        actual.Messages.Count()
                .Should()
                .Be(7);

        var messages = actual.Messages.Take(6)
                             .Select(v => v.Value.Value)
                             .ToList();

        messages[0].Should().Be("B-C");
        messages[1].Should().Be("C-D");
        messages[2].Should().Be("D-E");
        messages[3].Should().Be("E-F");
        messages[4].Should().Be("F");
        messages[5].Should().Be("");

    }
}

最佳答案

这是通过上述测试的候选解决方案。但是我不确定它是否满足要求。

/// <summary>
/// Pass the head and tail of the observable to the
/// selector function. Note that 
/// </summary>
/// <typeparam name="T"></typeparam>
/// <typeparam name="U"></typeparam>
/// <param name="source"></param>
/// <param name="fn"></param>
/// <returns></returns>
public static IObservable<U> HeadTailSelect<T, U>
    (this IObservable<T> source, Func<T, IObservable<T>, U> fn)
{
    var tail = new Subject<T>();
    return Observable.Create<U>(observer =>
    {
        return source.Subscribe(v =>
        {
            tail.OnNext(v);
            var u = fn(v, tail);
            observer.OnNext(u);

        }
        ,e=> { tail.OnCompleted();observer.OnError(e);  }
        ,()=> { tail.OnCompleted();observer.OnCompleted();  });
    });
}

请注意,u 很可能是某种IObservable,应该立即订阅。如果这样做,我认为一切都应该没问题。

关于c# - 如何在 Reactive Extensions 中同时选择 Head 和 Tail,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21904898/

相关文章:

c# - 拍摄 IObservable<T> 的快照

c# - 从头开始实现 IObservable<T>

Javascript线程竞争条件

c++ - 我如何证明 volatile 分配不是原子的?

C#删除类的实例?

C# List<> Order with 3 properties .Net 2.0

c# - 如何判断哪个面板触发了点击事件

c# - HttpWebResponse 不会针对并发出站请求进行扩展

c# - FlatMapC# 响应式(Reactive)扩展中的最新版本

c# - 任务并行库 (TPL) 是否处理竞争条件