c# - System.reactive - 按频率动态切换

标签 c# system.reactive

我有两个数据源。

让我们想象一下:

  • 系统 A 以更高的频率提供更高质量的数据,例如
    1价格/1秒,但有时会失败,没有数据或
    频率例如 1price/20sec
  • 系统 B 提供频率较低的数据,例如1 价格/10 秒

是否有任何优雅的方法使用 system.reactive 正常地从系统 A 检索数据,但当它失败(提要中没有数据)或速度减慢时,使用来自系统 B 的数据? 我想实现某种开关,当 A 源比 B 更快时,它将使用 A 源。我不想混合源,所以我一次只能使用 SystemA 或 SystemB。


    class PriceFeed {

        public IObservable<Price> GetPricesFeed(IObservable<PriceFromA> pricesFromA, IObservable<PriceFromB> pricesFromB)
        {


        }


        private Price Convert(PriceFromA price) { //convert }

        private Price Convert(PriceFromB price) { //convert }

    }

最佳答案

有趣的问题。首先要做的是编写某种频率收集函数。可能看起来像这样:

public static IObservable<int> GetFrequency<T>(this IObservable<T> source, TimeSpan measuringFreq, TimeSpan lookback)
{
    return source.GetFrequency(measuringFreq, lookback, Scheduler.Default);
}

public static IObservable<int> GetFrequency<T>(this IObservable<T> source, TimeSpan measuringFreq, TimeSpan lookback, IScheduler scheduler)
{
    return source.Buffer(lookback, measuringFreq, scheduler)
        .Select(l => l.Count);
}

如果 measuringFreq 为 1 秒,lookback 为 5 秒,这意味着每秒我们都会看到过去 5 秒内发送的消息数量。快速而肮脏的例子:

var r = new System.Random();
var nums = Observable.Generate(
    0, 
    i => i < 100, 
    i => i + 1, 
    i => i, _ => TimeSpan.FromSeconds(r.NextDouble() * 1)
);
var freq = nums.GetFrequency(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5));
freq.Dump(); //Linqpad

nums 是一个可观察对象,平均每半秒生成一条消息(它随机选择 0 到 1 秒之间的持续时间)。 freq 每秒生成一个值,该值返回过去 5 秒内生成的消息数 nums(平均应为 10)。在我的机器上最新运行时,我得到这个:

11
11
12
10
12
11
9
9
10
9
8
...

一旦我们找到了获取频率的方法,您就需要编写一个函数来将两个类似类型的可观察量合成在一起,并根据频率进行切换。我写的是:

public static IObservable<T> MaintainFrequencyImproper<T>(this IObservable<T> sourceA, IObservable<T> sourceB, TimeSpan measuringFreq, TimeSpan lookback, IScheduler scheduler, int aAdvantage = 0)
{
    var aFreq = sourceA.GetFrequency(measuringFreq, lookback, scheduler);
    var bFreq = sourceB.GetFrequency(measuringFreq, lookback, scheduler);

    var toReturn = aFreq.Zip(bFreq, (a, b) => a + aAdvantage - b)
        .Select(freqDifference => freqDifference < 0 ? sourceB : sourceA)   //If advantage is 0, and a & b both popped out 5 messages in the last second, then A wins
        .StartWith(sourceA)
        .Switch();

    return toReturn;
}

首先,我们使用 GetFrequency 获取两个可观测值的频率,然后将这两个值压缩在一起并进行比较。如果B比A更频繁,则使用B。如果它们相等或A更频繁,则使用A。

aAdvantage 变量允许您表达对 A 的更强偏好,而不是对 B 的偏好。0(默认值)表示源 A 赢得平局,或者当它更频繁时,否则 B 获胜。 2 意味着 B 在最近一段时间内必须比 A 多生成 3 条消息才能需要使用 B。

通过适当的发布可观察值来避免多次订阅,看起来像这样:

public static IObservable<T> MaintainFrequencyProper<T>(this IObservable<T> sourceA, IObservable<T> sourceB, TimeSpan measuringFreq, TimeSpan lookback, 
    IScheduler scheduler, int aAdvantage = 0)
{
    return sourceA.Publish(_sourceA => sourceB.Publish(_sourceB => 
        _sourceA.GetFrequency(measuringFreq, lookback, scheduler)
            .Zip(_sourceB.GetFrequency(measuringFreq, lookback, scheduler), (a, b) => a + aAdvantage - b)
            .Select(freqDifference => freqDifference < 0 ? _sourceB : _sourceA)
            .StartWith(_sourceA)
            .Switch()

    ))
}

我希望这有帮助。关于如何将其融入代码中,您没有留下太多内容。如果您需要,请添加 mcve .

关于c# - System.reactive - 按频率动态切换,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56885979/

相关文章:

c# - VS 2010 中项目的平台配置

c# - 元组==混淆

c# - 将 ISerializable 与 DataContractSerializer 一起使用时,如何阻止序列化程序输出类型信息?

c# - 在冷 IObservable 上暂停和恢复订阅

azure - Windows Azure 上的 RX

c# - 并发流图

c# - Entity Framework - 添加 75k 行需要多长时间?

c# - 即使在构造函数中初始化后,SelectList 也会抛出 "Object reference not set"错误

c# - Observable.Create : CancellationToken doesn't transition to IsCancellationRequested

.net - 带有 RX 扩展的 LINQ