我想创建以下组合器
public static IObservable<U> HeadTailSelect<T, U>
(this IObservable<T> source, Func<T, IObservable<T>, U> fn)
{
}
选择器方法应该传递当前事件和一个可观察到所有 future 事件的尾部。 必须保证,在未来任何时候订阅尾部时,第一个接收到的事件将是在头部之后接收到的下一个事件。
我知道这需要一些缓冲,但我不太确定如何将它们放在一起。
这有一些不错的属性。你可以做
IObservable<IObservable<Unit>> windows =
source
.HeadTailSelect((h,tail)=>Observable
.Interval(TimeSpan.FromSeconds(1))
.TakeUntil(tail)
.Select(_=>Unit.Default)
)
并避免竞争条件,即在窗口内需要 TakeUntil
才能在响应第一个事件后注册您错过了一些事件。
还有关于如何测试实现的任何想法的bonus karma。
下面的测试用例对于实现来说是必要的,尽管它可能不足以证明是否避免了竞争条件。
public class HeadTailSelect : ReactiveTest
{
TestScheduler _Scheduler = new TestScheduler();
[Fact]
public void ShouldWork()
{
var o = _Scheduler.CreateColdObservable
(OnNext(10, "A")
, OnNext(11, "B")
, OnNext(12, "C")
, OnNext(13, "D")
, OnNext(14, "E")
, OnNext(15, "F")
, OnCompleted<string>(700)
);
var data = o.HeadTailSelect((head, tail) => tail.Take(2).ToList())
.SelectMany(p=>p)
.Select(l=>String.Join("-", l));
var actual = _Scheduler.Start(() =>
data
, created: 0
, subscribed: 1
, disposed: 1000
);
actual.Messages.Count()
.Should()
.Be(7);
var messages = actual.Messages.Take(6)
.Select(v => v.Value.Value)
.ToList();
messages[0].Should().Be("B-C");
messages[1].Should().Be("C-D");
messages[2].Should().Be("D-E");
messages[3].Should().Be("E-F");
messages[4].Should().Be("F");
messages[5].Should().Be("");
}
}
最佳答案
这是通过上述测试的候选解决方案。但是我不确定它是否满足要求。
/// <summary>
/// Pass the head and tail of the observable to the
/// selector function. Note that
/// </summary>
/// <typeparam name="T"></typeparam>
/// <typeparam name="U"></typeparam>
/// <param name="source"></param>
/// <param name="fn"></param>
/// <returns></returns>
public static IObservable<U> HeadTailSelect<T, U>
(this IObservable<T> source, Func<T, IObservable<T>, U> fn)
{
var tail = new Subject<T>();
return Observable.Create<U>(observer =>
{
return source.Subscribe(v =>
{
tail.OnNext(v);
var u = fn(v, tail);
observer.OnNext(u);
}
,e=> { tail.OnCompleted();observer.OnError(e); }
,()=> { tail.OnCompleted();observer.OnCompleted(); });
});
}
请注意,u
很可能是某种IObservable
,应该立即订阅。如果这样做,我认为一切都应该没问题。
关于c# - 如何在 Reactive Extensions 中同时选择 Head 和 Tail,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21904898/