我想使用 Reactive Extensions 来转换一些消息并在一小段延迟后转发它们。
消息看起来像这样:
class InMsg
{
int GroupId { get; set; }
int Delay { get; set; }
string Content { get; set; }
}
输出看起来像这样:
class OutMsg
{
int GroupId { get; set; }
string Content { get; set; }
OutMsg(InMsg in)
{
GroupId = in.GroupId;
Content = Transform(in.Content); // function omitted
}
}
有几个要求:
- 延迟的长度取决于消息的内容。
- 每条消息都有一个GroupId
- 如果较新的消息与等待传输的延迟消息具有相同的 GroupId,则应丢弃第一条消息,并在新的延迟期后仅传输第二条消息。
给定一个 Observable
IObservable<InMsg> inMsgs = ...;
void Send(OutMsg o)
{
... // publishes transformed messages
}
我知道我可以使用 Select 来执行转换。
void SetUp()
{
inMsgs.Select(i => new OutMsg(i)).Subscribe(Send);
}
- 如何应用消息指定延迟? (请注意,这可能/应该会导致消息传递无序。)
- 如何删除具有相同 GroupId 的邮件的重复信息?
- Rx 能解决这个问题吗?
- 还有其他解决方法吗?
最佳答案
您可以使用 GroupBy
制作一个 IGroupedObservable
,使用 Delay
延迟输出,使用 Switch
制作确保较新的值替换其组中以前的值:
IObservable<InMsg> inMessages;
inMessages
.GroupBy(msg => msg.GroupId)
.Select(group =>
{
return group.Select(groupMsg =>
{
TimeSpan delay = TimeSpan.FromMilliseconds(groupMsg.Delay);
OutMsg outMsg = new OutMsg(); // map InMsg -> OutMsg here
return Observable.Return(outMsg).Delay(delay);
})
.Switch();
})
.Subscribe(outMsg => Console.Write("OutMsg received"));
关于实现的注意事项:如果分组值在消息发送之后到达(即在延迟之后),它将开始一个新的延迟
关于c# - 使用 Reactive Extensions (Rx) 进行延迟和重复数据删除,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4738134/