我有以下用于分区 IEnumerable<T>
的 Rx 扩展方法并延迟每个分区值的产生。它使用 IEnumerable<T>
对数据进行分区的扩展,这也通过单元测试显示。
有没有比使用 Observable.Timer().Wait()
更好的“延迟”方法?方法调用?
public static class RxExtensions
{
public static IObservable<IEnumerable<T>> PartitionWithInterval<T>(
this IObservable<IEnumerable<T>> source, int size, TimeSpan interval,
IScheduler scheduler = null)
{
if (scheduler == null)
{
scheduler = TaskPoolScheduler.Default;
}
var intervalEnabled = false;
return source.SelectMany(x => x.Partition(size).ToObservable())
.Window(1)
.SelectMany(x =>
{
if (!intervalEnabled)
{
intervalEnabled = true;
}
else
{
Observable.Timer(interval, TaskPoolScheduler.Default).Wait();
}
return x;
})
.ObserveOn(scheduler);
}
}
public static class EnumerableExtensions
{
public static IEnumerable<IEnumerable<T>> Partition<T>(
this IEnumerable<T> source, int size)
{
using (var enumerator = source.GetEnumerator())
{
var items = new List<T>();
while (enumerator.MoveNext())
{
items.Add(enumerator.Current);
if (items.Count == size)
{
yield return items.ToArray();
items.Clear();
}
}
if (items.Any())
{
yield return items.ToArray();
}
}
}
}
Rx扩展方法测试如下:
static void Main(string[] args)
{
try
{
var data = Enumerable.Range(0, 10);
var interval = TimeSpan.FromSeconds(1);
Observable.Return(data)
.PartitionWithInterval(2, interval)
.Timestamp()
.Subscribe(x =>
{
var message = $"{x.Timestamp} - count = {x.Value.Count()}" +
$", values - {x.Value.First()}, {x.Value.Last()}";
Console.WriteLine(message);
});
Console.ReadLine();
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
最佳答案
应该这样做:
public static IObservable<IEnumerable<T>> PartitionWithInterval<T>(this IObservable<IEnumerable<T>> source, int size, TimeSpan interval, IScheduler scheduler = null)
{
if (scheduler == null)
{
scheduler = TaskPoolScheduler.Default;
}
return source
//don't need the .ToObservable() call, since Zip can work on IEnumerable + IObservable.
.SelectMany(x => x.Partition(size))
.Zip(Observable.Interval(interval, scheduler).StartWith(0), (x, _) => x)
.ObserveOn(scheduler);
}
有趣的是 PartitionWithInterval
实际上是如何调用 Partition
和 Interval
的。
StartWith
就在那里,因此您可以立即删除一个分区:类似于您拥有 intervalEnabled
标志的方式。
关于c# - 对 IEnumerable<T> 的可观察流进行分区,并在 Reactive Extensions 中延迟,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44029098/