c# - 如何基于 Func<T> 将 IObservable<T> 窗口化/缓冲到 block 中

标签 c# system.reactive observable

给定一个类:

class Foo { DateTime Timestamp {get; set;} }

...和一个IObservable<Foo> , 保证单调递增 Timestamp s,我怎样才能生成 IObservable<IList<Foo>>根据那些 Timestamp 分块到列表中是吗?

即每个IList<Foo>应该有五秒钟的事件,或其他什么。我知道我可以使用 BufferTimeSpan重载,但我需要从事件本身而不是挂钟中抽出时间。 (除非有一种聪明的方法在这里提供 IScheduler,它使用 IObservable 本身作为 .Now 的来源?)

如果我尝试使用 Observable.Buffer(this IObservable<Foo> source, IObservable<Foo> bufferBoundaries)像这样重载:

IObservable<Foo> foos = //...;
var pub = foos.Publish();
var windows = pub.Select(x => new DateTime(
    x.Ticks - x.Ticks % TimeSpan.FromSeconds(5).Ticks)).DistinctUntilChanged();

pub.Buffer(windows).Subscribe(x => t.Dump()));  // linqpad
pub.Connect();

...然后是 IList实例包含导致窗口关闭的项目,但我真的希望这个项目进入下一个窗口/缓冲区。

例如带时间戳 [0, 1, 10, 11, 15]你会得到 [[0], [1, 10], [11, 15]] 的 block 而不是 [[0, 1], [10, 11], [15]]

最佳答案

这是一个想法。组关键条件是“窗口号”,我使用 GroupByUntil。这会在您的示例中为您提供所需的输出(并且我使用了一个 int 流,就像那个示例一样 - 但您可以替换任何您需要的内容来为您的窗口编号)。

public class Tests : ReactiveTest
{
    public void Test()
    {
        var scheduler = new TestScheduler();
        var xs = scheduler.CreateHotObservable<int>(
            OnNext(0, 0),
            OnNext(1, 1),
            OnNext(10, 10),
            OnNext(11, 11),
            OnNext(15, 15),
            OnCompleted(16, 0));                  
            
        xs.Publish(ps =>                                // (1)
            ps.GroupByUntil(
                p => p / 5,                             // (2)
                grp => ps.Where(p => p / 5 != grp.Key)) // (3)
            .SelectMany(x => x.ToList()))               // (4)
        .Subscribe(Console.WriteLine);
        
        scheduler.Start();
    }
}

注意事项

  1. 我们发布源流是因为我们会订阅不止一次。
  2. 这是一个创建组键的函数 - 使用它根据您的项目类型生成窗口编号。
  3. 这是组终止条件 - 使用它来检查另一个窗口中的项目的源流。请注意,这意味着窗口不会关闭,直到窗口外的元素到达或源流终止。如果您考虑一下,这是显而易见的 - 您想要的输出需要在窗口结束后考虑下一个元素。请注意,如果您的来源与实时有任何关系,您可以将其与 Observable.Timer+Select 合并,输出您的术语的空/默认实例以提前终止流。
  4. SelectMany 将组放入列表并展平流。

如果包含 nuget 包 rx-testing,这个示例将在 LINQPad 中运行得很好。新建一个测试实例并运行 Test() 方法。

关于c# - 如何基于 Func<T> 将 IObservable<T> 窗口化/缓冲到 block 中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26383908/

相关文章:

c# - 库的设计时验证

c# - 响应式扩展 Enumerable.Range/Observable.FromAsync 上的 OperationCancelled 异常

c# - 在 C# 中使用夏令时处理时区

c# Nhibernate 使用动态参数创建查询

c# - ASP.NET Core 3.1 Web API : can it be self-hosted as Windows service with https and use some certificate like IIS?

xamarin.ios - Rx 2.1 和 Xamarin

c# - 如果第一个为空,则切换到不同的 IObservable

c# - ReactiveUI - 为什么简单的 ReactiveList.Changed -> ToProperty 不起作用?

Angular 2 - 没有响应将对象发送到具有 Observable 服务的 <router-outlet> 组件

android - RxJava + 改造,获取列表并为每个项目添加额外信息