c# - 对 IEnumerable<T> 的可观察流进行分区,并在 Reactive Extensions 中延迟

标签 c# system.reactive

我有以下用于分区 IEnumerable<T> 的 Rx 扩展方法并延迟每个分区值的产生。它使用 IEnumerable<T>对数据进行分区的扩展,这也通过单元测试显示。

有没有比使用 Observable.Timer().Wait() 更好的“延迟”方法?方法调用?

public static class RxExtensions
{
    public static IObservable<IEnumerable<T>> PartitionWithInterval<T>(
        this IObservable<IEnumerable<T>> source, int size, TimeSpan interval,
        IScheduler scheduler = null)
    {
        if (scheduler == null)
        {
            scheduler = TaskPoolScheduler.Default;
        }

        var intervalEnabled = false;
        return source.SelectMany(x => x.Partition(size).ToObservable())
            .Window(1)
            .SelectMany(x =>
            {
                if (!intervalEnabled)
                {
                    intervalEnabled = true;
                }
                else
                {
                    Observable.Timer(interval, TaskPoolScheduler.Default).Wait();
                }

                return x;
            })
            .ObserveOn(scheduler);
    } 
}

public static class EnumerableExtensions
{
    public static IEnumerable<IEnumerable<T>> Partition<T>(
        this IEnumerable<T> source, int size)
    {
        using (var enumerator = source.GetEnumerator())
        {
            var items = new List<T>();
            while (enumerator.MoveNext())
            {
                items.Add(enumerator.Current);
                if (items.Count == size)
                {
                    yield return items.ToArray();

                    items.Clear();
                }
            }
           
            if (items.Any())
            {
                yield return items.ToArray();
            }
        }
    }
}

Rx扩展方法测试如下:

static void Main(string[] args)
{
     try
     {
         var data = Enumerable.Range(0, 10);
         var interval = TimeSpan.FromSeconds(1);

         Observable.Return(data)
            .PartitionWithInterval(2, interval)
            .Timestamp()
            .Subscribe(x =>
                {
                   var message = $"{x.Timestamp} - count = {x.Value.Count()}" +
                       $", values - {x.Value.First()}, {x.Value.Last()}";
                   Console.WriteLine(message);
                });

           Console.ReadLine();
       }
       catch (Exception e)
       {
           Console.WriteLine(e);
       }
}

最佳答案

应该这样做:

public static IObservable<IEnumerable<T>> PartitionWithInterval<T>(this IObservable<IEnumerable<T>> source, int size, TimeSpan interval, IScheduler scheduler = null)
{
    if (scheduler == null)
    {
        scheduler = TaskPoolScheduler.Default;
    }

    return source
        //don't need the .ToObservable() call, since Zip can work on IEnumerable + IObservable.
        .SelectMany(x => x.Partition(size)) 
        .Zip(Observable.Interval(interval, scheduler).StartWith(0), (x, _) => x)
        .ObserveOn(scheduler);
}

有趣的是 PartitionWithInterval 实际上是如何调用 PartitionInterval 的。

StartWith 就在那里,因此您可以立即删除一个分区:类似于您拥有 intervalEnabled 标志的方式。

关于c# - 对 IEnumerable<T> 的可观察流进行分区,并在 Reactive Extensions 中延迟,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44029098/

相关文章:

c# - 等待不使用当前的 SynchronizationContext

c# - 使用 C# 将文本框添加到网格

c# - 将 IObservable<bool> 和 IObservable<Exception> 合并到一个可观察量,当出现异常时,该可观察量会发生 OnErrors

c# - 如何合并两个 Observable,以便在任何 Observable 完成时结果完成?

c# - 响应式扩展 .MaxBy

system.reactive - 当最后一个观察者退订时,我该如何创建一个Rx可观察的站来停止发布事件?

c# - 我可以/应该使用隐式运算符而不是覆盖 ToString 吗?

c# - RunWorkerAsync 完成时关闭BackgroundWorker

c# - 按两个属性就地排序列表

c# - 如何使用 Reactive Extensions 实现事件