c# - IObservable - 如何发送/发布/推送新值到集合

标签 c# .net asynchronous system.reactive

我想从我的服务层公开一个 IObservable。

为简单起见,假设服务层在内部从远程服务器(通过套接字)获取消息,并且套接字库需要一个 IMessageReponse 对象,该对象具有要传递给它的 MessageReceived 方法。

服务层在内部创建一个 MessageResponse 对象,并在消息到达时通过 Action 回调得到通知。

鉴于这种设计,我需要能够将新消息推送到 IObservable,但在我见过的任何示例中,Observable.XYZ 似乎不支持简单的发送/发布/推送方法...

在这种情况下如何连接我的 Observable.XYZ???

我想要这样的东西...请注意,我知道这是 IObservable 的一个非常基本的实现,但我不会想到我需要自己编写这段代码...我本以为会发生一些事情开箱即用。

public class PushObservable<T> : IObservable<T>
{
    private IList<IObserver<T>> _listeners = new List<IObserver<T>>();

    public void Send(T value)
    {
        foreach (var listener in _listeners) 
            listener.OnNext(value); 
    }

    public IDisposable Subscribe(IObserver<T> observer)
    { 
        _listeners.Add(observer);
    }
}

最佳答案

您重写了一个已经存在的对象!您的“PushObservable”实际上是 Subject<T> ,它是 Rx 中的基本对象之一。

如果你真的想以 Rx 的方式考虑这个问题,你可能会从 IObservable<byte[]> 开始。来自套接字的,然后您将选择它进入 IObservable<IMessageResponse> ,因为归根结底,您要响应的事件是从网络中取出的字节。

关于c# - IObservable - 如何发送/发布/推送新值到集合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5161206/

相关文章:

c# - 为什么条件运算符不能正确地允许使用 "null"来分配给可为 null 的类型?

objective-c - 在 Objective-C 中同步异步任务

c# - 任务变量 ContinueWith,稍后等待?

c# - .NET 中 native 互操作性的代码组织

java - MSIL 和 Java 字节码的区别?

c# - 分块加载表

c# - 在 ASP.NET MVC 中通过重定向传递复杂对象?

javascript - 未捕获的语法错误 : await is only valid in async function in async function

c# - 类项目未从主项目加载 log4net.config

c# - 无法将默认比较器作为 IComparer<object>