c# - 一种以均匀间隔推送缓冲事件的方法

标签 c# .net system.reactive

我想要实现的是缓冲来自某些 IObservable 的传入事件(它们是突发的)并进一步释放它们,但是一个接一个,以均匀的间隔。 像这样:

-oo-ooo-oo------------------oooo-oo-o-------------->

-o--o--o--o--o--o--o--------o--o--o--o--o--o--o---->

因为我对 Rx 很陌生,所以我不确定是否已经有一个 Subject 或一个运算符可以做到这一点。也许可以通过组合来完成?

更新:

感谢 Richard Szalay为了指出 Drain 运算符,我找到了另一个 example by James Miles Drain 运算符的使用情况。以下是我如何让它在 WPF 应用程序中运行:

    .Drain(x => {
        Process(x);
        return Observable.Return(new Unit())
            .Delay(TimeSpan.FromSeconds(1), Scheduler.Dispatcher );
    }).Subscribe();

我玩得很开心,因为省略调度程序参数会导致应用程序在 Debug模式下崩溃而不会显示任何异常(我需要学习如何处理 Rx 中的异常)。 Process 方法直接修改 UI 状态,但我想从中创建一个 IObservable 非常简单(使用 ISubject?)。

更新:

与此同时,我一直在试验 ISubject,下面的类做了我想要的 - 它及时释放缓冲的 Ts:

public class StepSubject<T> : ISubject<T>
{
    IObserver<T> subscriber;
    Queue<T> queue = new Queue<T>();
    MutableDisposable cancel = new MutableDisposable();
    TimeSpan interval;
    IScheduler scheduler;
    bool idle = true;

    public StepSubject(TimeSpan interval, IScheduler scheduler)
    {
        this.interval = interval;
        this.scheduler = scheduler;
    }

    void Step()
    {
        T next;
        lock (queue)
        {
            idle = queue.Count == 0;
            if (!idle)
                next = queue.Dequeue();
        }

        if (!idle)
        {
            cancel.Disposable = scheduler.Schedule(Step, interval);
            subscriber.OnNext(next);
        }
    }

    public void OnNext(T value)
    {
        lock (queue)
            queue.Enqueue(value);

        if (idle)
            cancel.Disposable = scheduler.Schedule(Step);
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        subscriber = observer;
        return cancel;
    }
}

为了清楚起见,这个简单的实现从 OnCompleted 和 OnError 中剥离,也只允许单个订阅。

最佳答案

它实际上比听起来更狡猾。

使用 Delay 不起作用,因为值仍然会批量发生,只是稍微延迟。

IntervalCombineLatestZip 一起使用是行不通的,因为前者会导致源值被跳过而后者会缓冲间隔值。

我认为新的 Drain 运算符 ( added in 1.0.2787.0 ) 与 Delay 相结合应该可以解决问题:

source.Drain(x => Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)).StartWith(x));

Drain 运算符的工作方式与 SelectMany 类似,但会等到上一个输出完成后再使用下一个值调用选择器。 它仍然不是完全您所追求的( block 中的第一个值也将被延迟),但它很接近:上面的用法现在与您的大理石图匹配。 p>

编辑:显然框架中的Drain 不像SelectMany 那样工作。我会在官方论坛上寻求一些建议。与此同时,这里有一个 Drain 的实现,它可以满足您的需求:

编辑 09/11:修复了实现中的错误并更新了用法以匹配您请求的弹珠图。

public static class ObservableDrainExtensions
{
    public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source, 
        Func<TSource, IObservable<TOut>> selector)
    {
        return Observable.Defer(() =>
        {
            BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());

            return source
                .Zip(queue, (v, q) => v)
                .SelectMany(v => selector(v)
                    .Do(_ => { }, () => queue.OnNext(new Unit()))
                );
        });
    }
}

关于c# - 一种以均匀间隔推送缓冲事件的方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4123178/

相关文章:

c# - 类是否还应该负责将自身的数据存储到数据库中?

c# - LinqToSql 和抽象基类

c# - 内部抽象方法。为什么会有人拥有它们?

c# - 如何使用azure函数或逻辑应用程序监视sftp中的多个位置?

c# - 为可观察序列正确设置依赖谓词而不产生副作用的功能方法是什么?

c# - 将热可观察区间转换为冷可观察区间的响应式(Reactive)扩展方法

c# - .NET 相当于 java.awt.color.ColorSpace

c# - C# 中 BackgroundWorker 的性能问题

.net - 如何向用户发送电子邮件确认链接

c# - 使用 Rx 构建可观察的存储库