我最近一直在阅读有关 IObservable 的内容。到目前为止,我已经查看了各种 SO 问题,并观看了有关他们可以做什么的视频。我认为整个“推送”机制非常棒,但我仍在努力弄清楚每件事到底做了什么。根据我的阅读,我想在某种程度上 IObservable
是可以“观察”的东西,而 IObservers
是“观察者”。
所以现在我要尝试在我的应用程序中实现它。在我开始之前,有几件事我想弄清楚。我已经看到 IObservable 与 IEnumerable 相反,但是,在我的特定实例中我真的看不到任何可以合并到我的应用程序中的地方。
目前,我大量使用事件,以至于我可以看到“管道”开始变得难以管理。我想,IObservable 可以帮我解决这个问题。
考虑以下设计,它是我在我的应用程序中围绕我的 I/O 的包装器(仅供引用,我通常必须处理字符串):
我有一个名为 IDataIO
的基本接口(interface):
public interface IDataIO
{
event OnDataReceived;
event OnTimeout:
event OnTransmit;
}
现在,我目前有三个实现此接口(interface)的类,这些类中的每一个都以某种方式利用异步方法调用,引入某种类型的多线程处理:
public class SerialIO : IDataIO;
public class UdpIO : IDataIO;
public class TcpIO : IDataIO;
这些类中的每一个都有一个实例包含在我的最终类中,称为 IO(它也实现了 IDataIO - 遵循我的策略模式):
public class IO : IDataIO
{
public SerialIO Serial;
public UdpIO Udp;
public TcpIO Tcp;
}
我已经利用策略模式来封装这三个类,因此当在运行时在不同的 IDataIO
实例之间进行更改时,它对最终用户来说是“不可见的”。正如您所想象的,这在后台导致了相当多的“事件管道”。
那么,在我的案例中如何使用“推送”通知?我不想订阅事件(DataReceived 等),而是想简单地将数据推送给任何感兴趣的人。我有点不确定从哪里开始。我仍在尝试玩弄 Subject
的想法/通用类,以及它的各种化身 (ReplaySubject/AsynSubject/BehaviourSubject)。有人可以请教我这个(可能引用我的设计)吗?或者这根本不是 IObservable
的理想选择?
附言。请随时纠正我的任何“误解”:)
最佳答案
Observables 非常适合表示数据流,因此您的 DataReceived
事件可以很好地模拟可观察的模式,例如 IObservable<byte>
或 IObservable<byte[]>
.您还可以获得 OnError
的额外好处和 OnComplete
这很方便。
在实现方面,很难说出您的具体情况,但我们经常使用 Subject<T>
作为基础资源并调用 OnNext
推送数据。也许像
// Using a subject is probably the easiest way to push data to an Observable
// It wraps up both IObservable and IObserver so you almost never use IObserver directly
private readonly Subject<byte> subject = new Subject<byte>();
private void OnPort_DataReceived(object sender, EventArgs e)
{
// This pushes the data to the IObserver, which is probably just a wrapper
// around your subscribe delegate is you're using the Rx extensions
this.subject.OnNext(port.Data); // pseudo code
}
然后您可以通过属性公开主题:
public IObservable<byte> DataObservable
{
get { return this.subject; } // Or this.subject.AsObservable();
}
您可以替换您的 DataReceived
事件 IDataIO
用IObservable<T>
并让每个策略类以他们需要的方式处理他们的数据并推送到 Subject<T>
.
另一方面,订阅 Observable 的任何人都可以像处理事件一样处理它(只需使用 Action<byte[]>
),或者您可以使用 Select
在流上执行一些非常有用的工作。 , Where
, Buffer
等
private IDataIO dataIo = new ...
private void SubscribeToData()
{
dataIo.DataObservable.Buffer(16).Subscribe(On16Bytes);
}
private void On16Bytes(IList<byte> bytes)
{
// do stuff
}
ReplaySubject
/ConnectableObservable
当您知道您的订阅者将迟到但仍需要 catch 所有事件时,这非常有用。源缓存它推送的所有内容并为每个订阅者重播所有内容。只有你能说这是否是你真正需要的行为(但要小心,因为它会缓存所有会明显增加你的内存使用的东西)。
当我学习 Rx 时,我发现了 http://leecampbell.blogspot.co.uk/关于 Rx 的博客系列对于理解理论非常有用(帖子现在有点过时了,API 已经改变,所以要注意这一点)
关于c# - 使用 IObservable 代替事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11189944/