给定一个类:
class Foo { DateTime Timestamp {get; set;} }
...和一个IObservable<Foo>
, 保证单调递增 Timestamp
s,我怎样才能生成 IObservable<IList<Foo>>
根据那些 Timestamp
分块到列表中是吗?
即每个IList<Foo>
应该有五秒钟的事件,或其他什么。我知道我可以使用 Buffer
用TimeSpan
重载,但我需要从事件本身而不是挂钟中抽出时间。 (除非有一种聪明的方法在这里提供 IScheduler
,它使用 IObservable
本身作为 .Now
的来源?)
如果我尝试使用 Observable.Buffer(this IObservable<Foo> source, IObservable<Foo> bufferBoundaries)
像这样重载:
IObservable<Foo> foos = //...;
var pub = foos.Publish();
var windows = pub.Select(x => new DateTime(
x.Ticks - x.Ticks % TimeSpan.FromSeconds(5).Ticks)).DistinctUntilChanged();
pub.Buffer(windows).Subscribe(x => t.Dump())); // linqpad
pub.Connect();
...然后是 IList
实例包含导致窗口关闭的项目,但我真的希望这个项目进入下一个窗口/缓冲区。
例如带时间戳 [0, 1, 10, 11, 15]
你会得到 [[0], [1, 10], [11, 15]]
的 block 而不是 [[0, 1], [10, 11], [15]]
最佳答案
这是一个想法。组关键条件是“窗口号”,我使用 GroupByUntil
。这会在您的示例中为您提供所需的输出(并且我使用了一个 int 流,就像那个示例一样 - 但您可以替换任何您需要的内容来为您的窗口编号)。
public class Tests : ReactiveTest
{
public void Test()
{
var scheduler = new TestScheduler();
var xs = scheduler.CreateHotObservable<int>(
OnNext(0, 0),
OnNext(1, 1),
OnNext(10, 10),
OnNext(11, 11),
OnNext(15, 15),
OnCompleted(16, 0));
xs.Publish(ps => // (1)
ps.GroupByUntil(
p => p / 5, // (2)
grp => ps.Where(p => p / 5 != grp.Key)) // (3)
.SelectMany(x => x.ToList())) // (4)
.Subscribe(Console.WriteLine);
scheduler.Start();
}
}
注意事项
- 我们发布源流是因为我们会订阅不止一次。
- 这是一个创建组键的函数 - 使用它根据您的项目类型生成窗口编号。
- 这是组终止条件 - 使用它来检查另一个窗口中的项目的源流。请注意,这意味着窗口不会关闭,直到窗口外的元素到达或源流终止。如果您考虑一下,这是显而易见的 - 您想要的输出需要在窗口结束后考虑下一个元素。请注意,如果您的来源与实时有任何关系,您可以将其与
Observable.Timer+Select
合并,输出您的术语的空/默认实例以提前终止流。 - SelectMany 将组放入列表并展平流。
如果包含 nuget 包 rx-testing,这个示例将在 LINQPad 中运行得很好。新建一个测试实例并运行 Test()
方法。
关于c# - 如何基于 Func<T> 将 IObservable<T> 窗口化/缓冲到 block 中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26383908/