c# - Rx .NET - 强制缓冲区发出

标签 c# system.reactive rx.net

我想使用 Rx 缓冲区功能:

var source = new Subject<Price>();
var buffer = source
    .Buffer(TimeSpan.FromSeconds(30), 5)
    .Where(p => p.Any());

这意味着当缓冲区达到 5 大小时或自上次发出后 30 秒后发生发出(向订阅者发布)。

但我需要能够按需发出 - 例如当我收到高优先级序列项时。然后我想将它添加到可观察的 (source.OnNext()) 并以某种方式强制它发出(这意味着返回缓冲区中的所有元素并清除它)。

我知道我可以添加以下代码:

var flusher = new Subject<Price>();
var closing = flusher.Select(x => new List<Price> {x});
var query = buffer.Merge(closing).Subscribe(something);

并调用 flusher.OnNext(highPriorityItem) 我将让它发出。

但在这种情况下,我有两个独立的序列,有两个不同的发射。当缓冲区已满或特定项目按顺序出现时,我需要一个发射器。

Force flush count-type Observable.Buffer c#Force flush to Observable.Buffer c#好像不适合我

最佳答案

我认为 decPL 在这里有基本的想法,但他的解决方案并不稳定。根据 input observable 的调度程序,即使以正确的顺序订阅,您也可能会得到不可预测的结果。那是因为对 input 有多个独立的订阅。您需要通过 .Publish(...) 调用来推送这一切,以确保只有一个订阅。

它还需要一种在处理订阅时进行清理的方法。因此它还需要通过 .Create(...) 调用来运行。

方法如下:

var input = new Subject<Price>();

IObservable<IList<Price>> query =
    input
        .Publish(i =>
            Observable
                .Create<IList<Price>>(o =>
                {
                    var timeBuffer =
                        Observable
                            .Timer(TimeSpan.FromSeconds(10.0))
                            .Select(n => Unit.Default);
                    var flush =
                        i
                            .Where(p => p.IS_IMPORTANT)
                            .Select(n => Unit.Default);
                    var sizeBuffer =
                        i
                            .Buffer(5)
                            .Select(l => Unit.Default);
                    return
                        i
                            .Window(() => Observable.Merge(timeBuffer, sizeBuffer, flush))
                            .SelectMany(w => w.ToList())
                            .Subscribe(o);
                }));

query.Subscribe(w => DO_SOMETHING_WITH_PRICES(w));

关于c# - Rx .NET - 强制缓冲区发出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46688359/

相关文章:

c# - 要求接口(interface)实现具有静态 Parse 方法

c# - 写一个Rx "RetryAfter"扩展方法

c# - 如何获取 IObservable<IObservable<T>> 的最新变化事件?

c# - 如何将 "IObservable<bool> IsActive"和 "bool IsEnabled"组合在一个订阅中

c# - 如何根据特定条件使用滑动窗口触发 RX 信号

c# - 只读静态字段初始化的线程安全

c# - Form.ShowDialog() 不显示启用调试的窗口

c# - ServiceStack 的 JSON 反序列化器解析无效的 JSON

c# - 响应式订阅在一段时间内没有跟随不同事件的事件

kotlin - Observable withLatestFrom 值