我有一个 Observable 序列,它以快速爆发的方式产生事件(即:五个事件一个接一个地发生,然后是长时间的延迟,然后是另一个事件的快速爆发,等等)。我想通过在事件之间插入一个短暂的延迟来消除这些爆发。以下图为例:
Raw: --oooo--------------ooooo-----oo----------------ooo| Buffered: --o--o--o--o--------o--o--o--o--o--o--o---------o--o--o|
My current approach is to generate a metronome-like timer via Observable.Interval()
that signals when it's ok to pull another event from the raw stream. The problem is that I can't figure out how to then combine that timer with my raw unbuffered observable sequence.
IObservable.Zip()
is close to doing what I want, but it only works so long as the raw stream is producing events faster than the timer. As soon as there is a significant lull in the raw stream, the timer builds up a series of unwanted events that then immediately pair up with the next burst of events from the raw stream.
Ideally, I want an IObservable extension method with the following function signature that produces the bevaior I've outlined above. Now, come to my rescue StackOverflow :)
public static IObservable<T> Buffered(this IObservable<T> src, TimeSpan minDelay)
附言。我是 Rx 的新手,如果这是一个非常简单的问题,我深表歉意...
1。简单但有缺陷的方法
这是我最初的天真和简单的解决方案,但有很多问题:
public static IObservable<T> Buffered<T>(this IObservable<T> source, TimeSpan minDelay)
{
Queue<T> q = new Queue<T>();
source.Subscribe(x => q.Enqueue(x));
return Observable.Interval(minDelay).Where(_ => q.Count > 0).Select(_ => q.Dequeue());
}
第一个明显的问题是,原始源的内部订阅返回的 IDisposable 丢失了,因此无法终止订阅。在此方法返回的 IDisposable 上调用 Dispose 会终止计时器,但不会终止现在不必要地填充队列而没有人从队列中提取事件的底层原始事件馈送。
第二个问题是无法将异常或流结束通知从原始事件流传播到缓冲流 - 在订阅原始源时它们会被忽略。
最后但并非最不重要的一点是,现在我得到了定期唤醒的代码,无论实际上是否有任何工作要做,我宁愿在这个美妙的新 react 世界中避免这种情况。
2。方法过于复杂
为了解决在我最初的简单方法中遇到的问题,我写了一个远更复杂的函数,它的行为很像IObservable.Delay()
(我使用了.NET Reflector阅读该代码并将其用作我的功能的基础)。不幸的是,许多样板逻辑(例如 AnonymousObservable
)在 system.reactive 代码之外无法公开访问,因此我不得不复制并粘贴大量 代码。此解决方案似乎可行,但考虑到它的复杂性,我不太相信它没有错误。
我简直不敢相信没有一种方法可以使用标准 Reactive 扩展的某种组合来完成此操作。我讨厌自己在无谓地重新发明轮子,而我尝试构建的模式似乎是一个相当标准的模式。
最佳答案
这实际上是 A way to push buffered events in even intervals 的副本,但我将在此处包含一个摘要(原文看起来很困惑,因为它考虑了一些备选方案)。
public static IObservable<T> Buffered<T>(this IObservable<T> source, TimeSpan minDelay)
{
return source.Drain(x =>
Observable.Empty<int>()
.Delay(minDelay)
.StartWith(x)
);
}
我对 Drain 的实现与 SelectMany
类似,只是它会先等待先前的输出完成(您可以将其视为 ConactMany
,而 SelectMany
更像是 MergeMany
)。内置的 Drain
不会以这种方式工作,因此您需要包含以下实现:
public static class ObservableDrainExtensions
{
public static IObservable<TOut> Drain<TSource, TOut>(
this IObservable<TSource> source,
Func<TSource, IObservable<TOut>> selector)
{
return Observable.Defer(() =>
{
BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
return source
.Zip(queue, (v, q) => v)
.SelectMany(v => selector(v)
.Do(_ => { }, () => queue.OnNext(new Unit()))
);
});
}
}
关于c# - Rx IObservable 缓冲以平滑突发事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4505529/