让我们考虑以下流
SomeState state = new SomeState().
_refreshFiberStream =
Stream()
.SubscribeOn(new EventLoopScheduler())
.Select(DoCalc)
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(Update);
DoCalc 方法将转换到输入并使用“state”,并将结果输出提供给 Update 方法,后者将修改“state”。如果出现新事件,它应该根据上一个事件的最后更新状态进行操作,并基于此进行项目。
我正在寻找一种始终按顺序执行事件的方法。例如,如果我有三个事件,我正在寻找一种方法,以便它们在 DoCalc 中执行,Update 后跟 DoCalc,Update 后跟 DoCalc,Update。
相反,我看到的是 DoCalc、DoCalc、Update、Update、DoCalc、Update,即它们从不按顺序运行。
有没有办法在 Rx 中强制执行它
最佳答案
我看到一方面需要按顺序执行,另一方面需要分派(dispatch)到另一个线程。我的建议是将 Update
分成两部分:
- 需要顺序执行的部分(Update)
- 需要派发的部分(Dispatch)
然后你可以依次调用Do(Update)
,然后在dispatcher上调用Subscribe(Dispatch)
:
var result =
Stream()
.SubscribeOn(new EventLoopScheduler())
.Select(DoCalc)
.Do(Update)
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(Dispatch);
结果序列如下(“Dispatch n”调用可能发生在“Update n”之后的任何时间):
Select a Update a Select b Update b Dispatch a Dispatch b
I guess an alternative would be to use a ManualResetEvent
, which dictates that the next DoCalc can only proceed once the Update has occurred. You could do this by adding ManualResetEvent.WaitOne
to DoCalc, and ManualResetEvent.Set
to Update:
private ManualResetEvent _wait = new ManualResetEvent(true);
private string DoCalc(string input)
{
_wait.WaitOne();
Console.WriteLine("Selected {0}", input);
_wait.Reset();
return input;
}
private void Update(string input)
{
Console.WriteLine("Update {0}", input);
_wait.Set();
}
第二种方法“有效”,但像这样的线程阻塞让我感到不安——它似乎与响应式(Reactive)编程的目的相反。当然,同样,it's best to avoid introducing state, if possible.
关于c# - 在 Reactive Extensions 中一次处理一个事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19191086/