我正在多个线程中的UDP上接收消息。每次接待后,我都会提出MessageReceived.OnNext(message)
。
因为我使用多个线程,所以消息无序引发,这是一个问题。
如何通过消息计数器命令消息的引发?
(让我们说有一个message.counter属性)
必须记住,一条消息可能会在通信中丢失(如果在X消息未填满之后我们有一个埋藏的漏洞,我会提出下一条消息)
必须尽快引发消息(如果收到下一个计数器)
最佳答案
在说明检测丢失消息的要求时,您没有考虑到最后一条消息未到达的可能性。我添加了一个timeoutDuration
,如果在给定时间内没有任何响应,则刷新缓冲的消息-您可能想将其视为错误,请参阅注释以了解如何执行此操作。
我将通过定义具有以下签名的扩展方法来解决此问题:
public static IObservable<TSource> Sort<TSource>(
this IObservable<TSource> source,
Func<TSource, int> keySelector,
TimeSpan timeoutDuration = new TimeSpan(),
int gapTolerance = 0)
source
是未排序消息流keySelector
是从消息中提取int
密钥的功能。我假设寻找的第一个键是0;必要时进行修改。 timeoutDuration
,如果省略,则没有超时tolerance
是等待出现故障的消息时保留的最大消息数。传递0
以保存任意数量的消息scheduler
是用于超时的调度程序,用于测试目的,如果未指定,则使用默认值。 演练
我将在此处逐行演示。下面重复完整的实现。
分配默认计划程序
首先,如果未提供默认调度程序,则必须分配一个默认调度程序:
scheduler = scheduler ?? Scheduler.Default;
安排超时
现在,如果请求超时,我们将使用一个副本替换源,该副本将简单地终止,如果消息没有到达
OnCompleted
,则发送timeoutDuration
。if(timeoutDuration != TimeSpan.Zero)
source = source.Timeout(
timeoutDuration,
Observable.Empty<TSource>(),
scheduler);
如果您希望发送
TimeoutException
,只需删除Timeout
的第二个参数-空流,以选择执行此操作的重载。请注意,我们可以安全地与所有订阅者共享此文件,因此它位于Observable.Create
的调用之外。创建订阅处理程序
我们使用
Observable.Create
构建流。每当发生订阅时,即会调用作为Create
的参数的lambda函数,并将调用观察者(o
)传递给我们。 Create
返回我们的IObservable<T>
,因此我们在这里返回它。return Observable.Create<TSource>(o => { ...
初始化一些变量
我们将在
nextKey
中跟踪下一个预期的键值,并创建一个SortedDictionary
来保存乱序消息,直到可以发送它们为止。int nextKey = 0;
var buffer = new SortedDictionary<int, TSource>();
订阅源并处理消息
现在,我们可以订阅消息流(可能已应用超时)。首先,我们介绍
OnNext
处理程序。下一条消息分配给x
:return source.Subscribe(x => { ...
我们调用
keySelector
函数从消息中提取密钥:var key = keySelector(x);
如果消息具有旧密钥(因为它超出了我们对乱序消息的容忍度),我们将删除它并完成此消息(您可能希望采取其他措施):
// drop stale keys
if(key < nextKey) return;
否则,我们可能会有预期的密钥,在这种情况下,我们可以增加
nextKey
发送消息:if(key == nextKey)
{
nextKey++;
o.OnNext(x);
}
或者,我们将来可能会出现故障消息,在这种情况下,必须将其添加到缓冲区中。如果这样做,我们还必须确保缓冲区没有超出存储乱序消息的容限-在这种情况下,我们还将
nextKey
碰到缓冲区中的第一个键,因为它是SortedDictionary
,所以方便地使用下一个最低键:else if(key > nextKey)
{
buffer.Add(key, x);
if(gapTolerance != 0 && buffer.Count > gapTolerance)
nextKey = buffer.First().Key;
}
现在,不管上面的结果如何,我们都需要清空所有准备就绪的键的缓冲区。为此,我们使用辅助方法。请注意,它会调整
nextKey
,因此我们必须小心通过引用传递它。我们只需遍历缓冲区读取,删除和发送消息,只要键彼此跟随即可,每次都增加nextKey
:private static void SendNextConsecutiveKeys<TSource>(
ref int nextKey,
IObserver<TSource> observer,
SortedDictionary<int, TSource> buffer)
{
TSource x;
while(buffer.TryGetValue(nextKey, out x))
{
buffer.Remove(nextKey);
nextKey++;
observer.OnNext(x);
}
}
处理错误
接下来,我们提供一个
OnError
处理程序-这将通过任何错误,包括Timeout异常(如果您选择采用这种方式)。冲洗缓冲区
最后,我们必须处理
OnCompleted
。在这里,我选择清空缓冲区-如果乱序消息阻止了消息并且从未到达,则这是必要的。这就是为什么我们需要超时的原因:() => {
// empty buffer on completion
foreach(var item in buffer)
o.OnNext(item.Value);
o.OnCompleted();
});
全面实施
这是完整的实现。
public static IObservable<TSource> Sort<TSource>(
this IObservable<TSource> source,
Func<TSource, int> keySelector,
int gapTolerance = 0,
TimeSpan timeoutDuration = new TimeSpan(),
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
if(timeoutDuration != TimeSpan.Zero)
source = source.Timeout(
timeoutDuration,
Observable.Empty<TSource>(),
scheduler);
return Observable.Create<TSource>(o => {
int nextKey = 0;
var buffer = new SortedDictionary<int, TSource>();
return source.Subscribe(x => {
var key = keySelector(x);
// drop stale keys
if(key < nextKey) return;
if(key == nextKey)
{
nextKey++;
o.OnNext(x);
}
else if(key > nextKey)
{
buffer.Add(key, x);
if(gapTolerance != 0 && buffer.Count > gapTolerance)
nextKey = buffer.First().Key;
}
SendNextConsecutiveKeys(ref nextKey, o, buffer);
},
o.OnError,
() => {
// empty buffer on completion
foreach(var item in buffer)
o.OnNext(item.Value);
o.OnCompleted();
});
});
}
private static void SendNextConsecutiveKeys<TSource>(
ref int nextKey,
IObserver<TSource> observer,
SortedDictionary<int, TSource> buffer)
{
TSource x;
while(buffer.TryGetValue(nextKey, out x))
{
buffer.Remove(nextKey);
nextKey++;
observer.OnNext(x);
}
}
测试线束
如果您在控制台应用程序中包含nuget
rx-testing
,则将在运行以下测试工具时运行以下命令:public static void Main()
{
var tests = new Tests();
tests.Test();
}
public class Tests : ReactiveTest
{
public void Test()
{
var scheduler = new TestScheduler();
var xs = scheduler.CreateColdObservable(
OnNext(100, 0),
OnNext(200, 2),
OnNext(300, 1),
OnNext(400, 4),
OnNext(500, 5),
OnNext(600, 3),
OnNext(700, 7),
OnNext(800, 8),
OnNext(900, 9),
OnNext(1000, 6),
OnNext(1100, 12),
OnCompleted(1200, 0));
//var results = scheduler.CreateObserver<int>();
xs.Sort(
keySelector: x => x,
gapTolerance: 2,
timeoutDuration: TimeSpan.FromTicks(200),
scheduler: scheduler).Subscribe(Console.WriteLine);
scheduler.Start();
}
}
结束语
这里有各种各样有趣的替代方法。我之所以采用这种主要命令性方法,是因为我认为这是最容易遵循的方法-但您可能可以使用一些花哨的方式对这些恶作剧进行分组。我知道关于Rx始终如一的事实-总是有很多方法可以给猫皮!
我也不完全满意这里的超时概念-在生产系统中,我想实现一些检查连接性的方法,例如心跳或类似信号。我没有涉及到它,因为显然它将是针对特定应用程序的。此外,之前在这些板上以及其他地方已经讨论过心跳(such as on my blog for example)。
关于udp - 订购 react 性扩展事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26450521/