我有两个数据源。
让我们想象一下:
- 系统 A 以更高的频率提供更高质量的数据,例如
1价格/1秒,但有时会失败,没有数据或
频率例如 1price/20sec - 系统 B 提供频率较低的数据,例如1 价格/10 秒
是否有任何优雅的方法使用 system.reactive 正常地从系统 A 检索数据,但当它失败(提要中没有数据)或速度减慢时,使用来自系统 B 的数据? 我想实现某种开关,当 A 源比 B 更快时,它将使用 A 源。我不想混合源,所以我一次只能使用 SystemA 或 SystemB。
class PriceFeed {
public IObservable<Price> GetPricesFeed(IObservable<PriceFromA> pricesFromA, IObservable<PriceFromB> pricesFromB)
{
}
private Price Convert(PriceFromA price) { //convert }
private Price Convert(PriceFromB price) { //convert }
}
最佳答案
有趣的问题。首先要做的是编写某种频率收集函数。可能看起来像这样:
public static IObservable<int> GetFrequency<T>(this IObservable<T> source, TimeSpan measuringFreq, TimeSpan lookback)
{
return source.GetFrequency(measuringFreq, lookback, Scheduler.Default);
}
public static IObservable<int> GetFrequency<T>(this IObservable<T> source, TimeSpan measuringFreq, TimeSpan lookback, IScheduler scheduler)
{
return source.Buffer(lookback, measuringFreq, scheduler)
.Select(l => l.Count);
}
如果 measuringFreq
为 1 秒,lookback
为 5 秒,这意味着每秒我们都会看到过去 5 秒内发送的消息数量。快速而肮脏的例子:
var r = new System.Random();
var nums = Observable.Generate(
0,
i => i < 100,
i => i + 1,
i => i, _ => TimeSpan.FromSeconds(r.NextDouble() * 1)
);
var freq = nums.GetFrequency(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5));
freq.Dump(); //Linqpad
nums
是一个可观察对象,平均每半秒生成一条消息(它随机选择 0 到 1 秒之间的持续时间)。 freq
每秒生成一个值,该值返回过去 5 秒内生成的消息数 nums
(平均应为 10)。在我的机器上最新运行时,我得到这个:
11
11
12
10
12
11
9
9
10
9
8
...
一旦我们找到了获取频率的方法,您就需要编写一个函数来将两个类似类型的可观察量合成在一起,并根据频率进行切换。我写的是:
public static IObservable<T> MaintainFrequencyImproper<T>(this IObservable<T> sourceA, IObservable<T> sourceB, TimeSpan measuringFreq, TimeSpan lookback, IScheduler scheduler, int aAdvantage = 0)
{
var aFreq = sourceA.GetFrequency(measuringFreq, lookback, scheduler);
var bFreq = sourceB.GetFrequency(measuringFreq, lookback, scheduler);
var toReturn = aFreq.Zip(bFreq, (a, b) => a + aAdvantage - b)
.Select(freqDifference => freqDifference < 0 ? sourceB : sourceA) //If advantage is 0, and a & b both popped out 5 messages in the last second, then A wins
.StartWith(sourceA)
.Switch();
return toReturn;
}
首先,我们使用 GetFrequency
获取两个可观测值的频率,然后将这两个值压缩在一起并进行比较。如果B比A更频繁,则使用B。如果它们相等或A更频繁,则使用A。
aAdvantage
变量允许您表达对 A 的更强偏好,而不是对 B 的偏好。0(默认值)表示源 A 赢得平局,或者当它更频繁时,否则 B 获胜。 2 意味着 B 在最近一段时间内必须比 A 多生成 3 条消息才能需要使用 B。
通过适当的发布
可观察值来避免多次订阅,看起来像这样:
public static IObservable<T> MaintainFrequencyProper<T>(this IObservable<T> sourceA, IObservable<T> sourceB, TimeSpan measuringFreq, TimeSpan lookback,
IScheduler scheduler, int aAdvantage = 0)
{
return sourceA.Publish(_sourceA => sourceB.Publish(_sourceB =>
_sourceA.GetFrequency(measuringFreq, lookback, scheduler)
.Zip(_sourceB.GetFrequency(measuringFreq, lookback, scheduler), (a, b) => a + aAdvantage - b)
.Select(freqDifference => freqDifference < 0 ? _sourceB : _sourceA)
.StartWith(_sourceA)
.Switch()
))
}
我希望这有帮助。关于如何将其融入代码中,您没有留下太多内容。如果您需要,请添加 mcve .
关于c# - System.reactive - 按频率动态切换,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56885979/