我想使用 Rx 缓冲区功能:
var source = new Subject<Price>();
var buffer = source
.Buffer(TimeSpan.FromSeconds(30), 5)
.Where(p => p.Any());
这意味着当缓冲区达到 5 大小时或自上次发出后 30 秒后发生发出(向订阅者发布)。
但我需要能够按需发出 - 例如当我收到高优先级序列项时。然后我想将它添加到可观察的 (source.OnNext()
) 并以某种方式强制它发出(这意味着返回缓冲区中的所有元素并清除它)。
我知道我可以添加以下代码:
var flusher = new Subject<Price>();
var closing = flusher.Select(x => new List<Price> {x});
var query = buffer.Merge(closing).Subscribe(something);
并调用 flusher.OnNext(highPriorityItem) 我将让它发出。
但在这种情况下,我有两个独立的序列,有两个不同的发射。当缓冲区已满或特定项目按顺序出现时,我需要一个发射器。
Force flush count-type Observable.Buffer c#和 Force flush to Observable.Buffer c#好像不适合我
最佳答案
我认为 decPL 在这里有基本的想法,但他的解决方案并不稳定。根据 input
observable 的调度程序,即使以正确的顺序订阅,您也可能会得到不可预测的结果。那是因为对 input
有多个独立的订阅。您需要通过 .Publish(...)
调用来推送这一切,以确保只有一个订阅。
它还需要一种在处理订阅时进行清理的方法。因此它还需要通过 .Create(...)
调用来运行。
方法如下:
var input = new Subject<Price>();
IObservable<IList<Price>> query =
input
.Publish(i =>
Observable
.Create<IList<Price>>(o =>
{
var timeBuffer =
Observable
.Timer(TimeSpan.FromSeconds(10.0))
.Select(n => Unit.Default);
var flush =
i
.Where(p => p.IS_IMPORTANT)
.Select(n => Unit.Default);
var sizeBuffer =
i
.Buffer(5)
.Select(l => Unit.Default);
return
i
.Window(() => Observable.Merge(timeBuffer, sizeBuffer, flush))
.SelectMany(w => w.ToList())
.Subscribe(o);
}));
query.Subscribe(w => DO_SOMETHING_WITH_PRICES(w));
关于c# - Rx .NET - 强制缓冲区发出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46688359/