c# - 用一个无限的序列压缩,这个序列是真的然后总是假的

标签 c# system.reactive infinite

我做了一个扩展方法:

public static IObservable<T> RateLimit<T>(this IObservable<T> source, 
                                          TimeSpan minDelay)
{
    return
        source.TimeInterval()
            .Select(
                (x, i) =>
                    Observable.Return(x.Value)
                        .Delay(i == 0
                            ? TimeSpan.Zero
                            : TimeSpan.FromTicks(
                                  Math.Max(minDelay.Ticks - x.Interval.Ticks, 0))))
            .Concat();
}

这会创建一个新的可观察对象,它只允许项目以最小的时间间隔通过。

要消除初始延迟,有必要区别对待第一项。

可以看出,有一个测试是通过测试i == 0来判断我们是否在处理第一项。这里的问题是,如果我们处理超过 int.MaxValue 个项目,这将失败。

相反,我想到了以下顺序

var trueThenFalse = Observable.Return(true)
                    .Concat(Observable.Repeat(false))

并将其压缩到我的来源旁边:

source.TimeInterval().Zip(trueThenFalse, ...

但是当将这个无限序列传递给 Zip 时,我们似乎进入了一个紧密循环,其中 trueThenFalse 一次性发出所有项目(到无穷大)。失败。

我可以很容易地用副作用来解决这个问题(例如,在外部范围中的 bool),但这将代表我不满意的纯度损失。

有什么建议吗?

编辑

虽然行为不完全相同,但以下代码表现出一些令人讨厌的特征

var trueThenFalse = Observable.Return(true)
    .Concat(Observable.Repeat(false));
var src = Observable.Interval(TimeSpan.FromSeconds(1)); //never completes
src.Zip(trueThenFalse,(i,tf)=>tf).ForEach(x=>Trace.TraceInformation("{0}",x));

并最终死于 OOME。这是因为 trueThenFalse 似乎正在取消假脱机其所有值,但 Zip 没有及时消耗它们。

最佳答案

原来 Zip 有 another overload可以将 IObservable 序列与 IEnumerable 序列压缩在一起。

通过将 IObservable 的推送语义与 IEnumerable 的拉取语义相结合,可以让我的测试用例正常工作。

所以,用下面的方法:

private IEnumerable<T> Inf<T>(T item)
{
    for (;;)
    {
        yield return item;
    }
}

我们可以创建一个 IEnumerable:

var trueThenFalse = Enumerable.Repeat(true, 1).Concat(Inf(false));

然后用可观察源压缩它:

var src = Observable.Interval(TimeSpan.FromSeconds(1));
src.Zip(trueThenFalse, (i, tf) => tf).ForEach(x => Trace.TraceInformation("{0}", x));

...一切都按预期进行。

现在我的 RateLimiter 方法有以下实现:

public static IObservable<T> RateLimit<T>(this IObservable<T> source,
                                          TimeSpan minDelay)
{
    var trueThenFalse = Enumerable.Repeat(true, 1).Concat(Inf(false));
    return
        source.TimeInterval()
            .Zip(trueThenFalse, (item, firstTime) => Observable.Return(item.Value)
                .Delay(firstTime
                    ? TimeSpan.Zero
                    : TimeSpan.FromTicks(
                        Math.Max(minDelay.Ticks - item.Interval.Ticks, 0))))

            .Concat();
}

关于c# - 用一个无限的序列压缩,这个序列是真的然后总是假的,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40740620/

相关文章:

C#,有没有像golang那样的 'Defer Call'?

c# - 如何使用子目录中的 RESX 文件本地化 WPF 应用程序

c# - Reactive Extensions .NET 3.5 中的 ConcurrentDictionary 在哪里

f# - 具有历史记录的困难序列处理

c# - Windows 8 XAML - C++ 与 C# - 用户体验

c# - 当我写xml时如何增加进度条?

system.reactive - 是否有 RxJava 的 doOnSubscribe 的 C# System.Reactive 版本?

f# - 如何在 F# 中编写通用的递归扩展方法?

c++ - 即使使用适当的方面,非有限 float 的反序列化也会失败

java - Findbugs 无法检测所有无限循环