c# - 使用 Rx,如何在我的订阅方法运行时忽略除最新值之外的所有值

标签 c# system.reactive

使用 Reactive Extensions ,我想忽略在我的 Subscribe 方法运行时发生的来 self 的事件流的消息。 IE。有时我处理一条消息所花的时间比消息之间的时间要长,所以我想丢弃我没有时间处理的消息。

但是,当我的 Subscribe 方法完成时,如果确实有任何消息通过,我想处理最后一条。所以我总是处理最新的消息。

所以,如果我有一些代码可以:

messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);

如果我们假设“100”需要很长时间来处理。然后我希望在“100”完成时处理“2”。 “1”应该被忽略,因为它已被“2”取代,而“100”仍在处理中。

这是我想要使用后台任务和 Latest()

的结果示例
var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));

Task.Factory.StartNew(() =>
{
    foreach(var n in messages.Latest())
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    }
});

但是,Latest() 是一个阻塞调用,我不希望有一个线程像这样等待下一个值(有时消息之间会有很长的间隔)。

我还可以通过使用来自 TPL Dataflow,BroadcastBlock 获得我想要的结果像这样:

var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));

buffer.AsObservable()
    .Subscribe(n =>
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    });

但这感觉应该可以直接在 Rx 中实现。执行此操作的最佳方法是什么?

最佳答案

这是一个类似于 Dave 的方法,但使用 Sample 代替(比 buffer 更合适)。我添加了一种与我添加到 Dave 的答案中的方法类似的扩展方法。

扩展名:

public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action)
{
    var sampler = new Subject<Unit>();

    var sub = source.
        Sample(sampler).
        ObserveOn(Scheduler.ThreadPool).
        Subscribe(l =>
        {
            action(l);
            sampler.OnNext(Unit.Default);
        });

    // start sampling when we have a first value
    source.Take(1).Subscribe(_ => sampler.OnNext(Unit.Default));

    return sub;
}

请注意,它更简单,并且没有触发“空”缓冲区。发送到操作的第一个元素实际上来自流本身。

用法很简单:

messages.SubscribeWithoutOverlap(n =>
{
    Console.WriteLine("start: " + n);
    Thread.Sleep(500);
    Console.WriteLine("end: " + n);
});

messages.Subscribe(x => Console.WriteLine("source: " + x)); // for testing

和结果:

source: 0
start: 0
source: 1
source: 2
source: 3
source: 4
source: 5
end: 0
start: 5
source: 6
source: 7
source: 8
source: 9
source: 10
end: 5
start: 10
source: 11
source: 12
source: 13
source: 14
source: 15
end: 10

关于c# - 使用 Rx,如何在我的订阅方法运行时忽略除最新值之外的所有值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11010602/

相关文章:

c# - 我可以使用响应式(Reactive)扩展来对 Windows 服务的调用进行排队吗?

c# - 在 WPF 中使用 Unity 解析时,SynchronizationContext.Current 为空

c# - 启用 HTTPS、IIS 托管的 WCF 服务,如何保护它?

c# - 如何用字符串替换控件的名称和方法?

c# - 如何用C#实现视频直播?

python - 在 RX 中构建双重无限轮询行为

c# - 在 c# 中是否有用于可观察对象的 if/then/else 运算符可用?

system.reactive - Rx Observable Window 带参数关闭功能

c# - Gridview 显示空行

c# - linq:ToDictionary 是否减少了返回的列数?