使用 Reactive Extensions ,我想忽略在我的 Subscribe
方法运行时发生的来 self 的事件流的消息。 IE。有时我处理一条消息所花的时间比消息之间的时间要长,所以我想丢弃我没有时间处理的消息。
但是,当我的 Subscribe
方法完成时,如果确实有任何消息通过,我想处理最后一条。所以我总是处理最新的消息。
所以,如果我有一些代码可以:
messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);
如果我们假设“100”需要很长时间来处理。然后我希望在“100”完成时处理“2”。 “1”应该被忽略,因为它已被“2”取代,而“100”仍在处理中。
这是我想要使用后台任务和 Latest()
var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));
Task.Factory.StartNew(() =>
{
foreach(var n in messages.Latest())
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
}
});
但是,Latest() 是一个阻塞调用,我不希望有一个线程像这样等待下一个值(有时消息之间会有很长的间隔)。
我还可以通过使用来自 TPL Dataflow, 的 BroadcastBlock
获得我想要的结果像这样:
var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));
buffer.AsObservable()
.Subscribe(n =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
});
但这感觉应该可以直接在 Rx 中实现。执行此操作的最佳方法是什么?
最佳答案
这是一个类似于 Dave 的方法,但使用 Sample
代替(比 buffer 更合适)。我添加了一种与我添加到 Dave 的答案中的方法类似的扩展方法。
扩展名:
public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action)
{
var sampler = new Subject<Unit>();
var sub = source.
Sample(sampler).
ObserveOn(Scheduler.ThreadPool).
Subscribe(l =>
{
action(l);
sampler.OnNext(Unit.Default);
});
// start sampling when we have a first value
source.Take(1).Subscribe(_ => sampler.OnNext(Unit.Default));
return sub;
}
请注意,它更简单,并且没有触发“空”缓冲区。发送到操作的第一个元素实际上来自流本身。
用法很简单:
messages.SubscribeWithoutOverlap(n =>
{
Console.WriteLine("start: " + n);
Thread.Sleep(500);
Console.WriteLine("end: " + n);
});
messages.Subscribe(x => Console.WriteLine("source: " + x)); // for testing
和结果:
source: 0
start: 0
source: 1
source: 2
source: 3
source: 4
source: 5
end: 0
start: 5
source: 6
source: 7
source: 8
source: 9
source: 10
end: 5
start: 10
source: 11
source: 12
source: 13
source: 14
source: 15
end: 10
关于c# - 使用 Rx,如何在我的订阅方法运行时忽略除最新值之外的所有值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11010602/