c# - 在 Rx 中,如何对一段时间后的最新项目进行分组?

标签 c# system.reactive

抱歉,如果标题不是很清楚,我想不出更好的...

我正在接收 IObservable<char> 形式的用户输入,我想将其转换为 IObservable<char[]> ,通过在每次用户停止输入超过 1 秒时对字符进行分组。因此,例如,如果输入如下:

h
e
l
l
o
(pause)
w
o
r
l
d
(pause)
!
(pause)

我希望可观察到的输出是:

['h', 'e', 'l', 'l', 'o']
['w', 'o', 'r', 'l', 'd']
['!']

我怀疑解决方案相当简单,但我找不到正确的方法...我尝试使用 Buffer , GroupByUntil , Throttle和其他一些,无济于事。

欢迎提出任何想法!


编辑:我有一些几乎可以工作的东西:

        _input.Buffer(() => _input.Delay(TimeSpan.FromSeconds(1)))
              .ObserveOnDispatcher()
              .Subscribe(OnCompleteInput);

但我需要在每次输入新字符时重置延迟...

最佳答案

BufferThrottle 就足够了,如果你的源很热的话。要使其成为热点,您可以使用 .Publish().RefCount() 来确保您最终只订阅一次源。

IObservable<IList<T>> BufferWithInactivity<T>(this IObservable<T> source,
                                              TimeSpan dueTime)
{
    if (source == null) throw new ArgumentNullException("source");
    //defer dueTime checking to Throttle
    var hot = source.Publish().RefCount();
    return hot.Buffer(() => hot.Throttle(dueTime));
}

关于c# - 在 Rx 中,如何对一段时间后的最新项目进行分组?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9985125/

相关文章:

c# - asp.net mvc 图像路径和虚拟目录

c# - 适用于 Android 的绑定(bind)库 Mono

c# - Observable.Interval 等待 Action 完成

c# - ReactiveCommand 未按预期处理可观察调用

c# - EF 生成的查询执行时间过长

c# - Timespan.TotalDays - 无法将 double 转换为字符串

c# - ASP MVC Url 编码双转义序列

event-handling - 在单线程应用程序中,EventBus/PubSub 与(响应式(Reactive)扩展)RX 的代码清晰度比较

c# - 并行处理一组 url 并返回一个 IEnumerable

c# - 接收 : How to buffer events (ring buffer) and only flush them when a special event occurs?