c# - 使用 Reactive Extensions (Rx) 进行延迟和重复数据删除

标签 c# system.reactive reactive-programming

我想使用 Reactive Extensions 来转换一些消息并在一小段延迟后转发它们。

消息看起来像这样:

class InMsg
{
   int GroupId { get; set; }
   int Delay { get; set; }
   string Content { get; set; }
}

输出看起来像这样:

class OutMsg
{ 
   int GroupId { get; set; }
   string Content { get; set; }
   OutMsg(InMsg in)
   {
       GroupId = in.GroupId;
       Content = Transform(in.Content);  // function omitted
   }
}

有几个要求:

  • 延迟的长度取决于消息的内容。
  • 每条消息都有一个GroupId
  • 如果较新的消息与等待传输的延迟消息具有相同的 GroupId,则应丢弃第一条消息,并在新的延迟期后仅传输第二条消息。

给定一个 Observable 和一个 Send 函数:

IObservable<InMsg> inMsgs = ...;

void Send(OutMsg o)
{
     ... // publishes transformed messages
}

我知道我可以使用 Select 来执行转换。

void SetUp()
{
     inMsgs.Select(i => new OutMsg(i)).Subscribe(Send);
}
  • 如何应用消息指定延迟? (请注意,这可能/应该会导致消息传递无序。)
  • 如何删除具有相同 GroupId 的邮件的重复信息?
  • Rx 能解决这个问题吗?
  • 还有其他解决方法吗?

最佳答案

您可以使用 GroupBy 制作一个 IGroupedObservable,使用 Delay 延迟输出,使用 Switch 制作确保较新的值替换其组中以前的值:

IObservable<InMsg> inMessages;

inMessages
    .GroupBy(msg => msg.GroupId)
    .Select(group =>
        {
            return group.Select(groupMsg => 
                {
                    TimeSpan delay = TimeSpan.FromMilliseconds(groupMsg.Delay);
                    OutMsg outMsg = new OutMsg(); // map InMsg -> OutMsg here

                    return Observable.Return(outMsg).Delay(delay);
                })
                .Switch();
        })
        .Subscribe(outMsg => Console.Write("OutMsg received"));

关于实现的注意事项:如果分组值在消息发送之后到达(即在延迟之后),它将开始一个新的延迟

关于c# - 使用 Reactive Extensions (Rx) 进行延迟和重复数据删除,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4738134/

相关文章:

c# - 使用 XAML 字体系列会破坏一些 Unicode 字形

c# - 工厂根据通用类型创建对象 C#

c# - 在面向 .NET 4+ 的库中公开通知时,IObservable 是否应该优先于事件

spring-boot - Spring Webflux 抛出 "block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2"

c# - 如何在 Web 测试插件中调用 MoveDataTableCursor 来浏览表中有限的行集?

c# - NLS 环境设置和 Oracle Managed ODP.Net

c# - RX 中的冷可观察对象与普通可枚举对象之间的区别

system.reactive - 我如何 DumpLive 长时间运行的进程的结果?

javascript - 如果点击事件是在使用 Bacon.js 的拖动事件之后发生的,我该如何取消点击事件?

java - Flux to List<Objects> 无阻塞