c# - 在方法上创建一个可观察对象

标签 c# .net-core system.reactive

在我的应用程序中,每当服务器上的某个队列发生更新时,我都会调用一个方法。该应用已初始化为以这种方式运行。

现在,每次使用最新数据调用该方法时,我想将其视为事件流的一部分,从而使其成为永不以订阅者结束的 Observable 的一部分。

我面临的挑战是:如何在被调用的方法上创建一个可观察对象?下面是我的示例代码。

//This method is invoked every time an update happens on the server
public virtual void MessageHandler(MyObject1 object1, MyObject2 object2)
{
    Observable.Create<MyObject3>(observer =>
        {
            var object3 = new MyObject3(object1, object2);
            observer.OnNext(object3 );

            return Disposable.Empty;
        })
        .Subscribe(x => WriteLine($"Message acknowledged"));
}

但这会在每次调用方法时创建一个可观察对象,这不是我想要的,而且看起来也不是正确的方法。我还读到使用“Subject”或“AsyncSubject”不是解决问题的正确方法。

最佳答案

关于不使用 Subject 的规则更像是一个没有很好表达的指南。

一般来说,如果您在可观察管道使用主题,那么您很可能会做错事 - 应该避免这种情况。

如果您使用Subject 作为可观察and 的来源,您可以正确封装Subject and> 你混淆了它,那么你就没事了。所以这通常意味着使用只有您的代码可以访问的 private 字段(因此没有人可以调用 .OnCompleted() 并调用 .AsObservable() 这样就没有人可以将您的可观察对象转换回底层 Subject

在您的情况下,您直接订阅,因此不需要 .AsObservable(),但我怀疑这只是演示代码。在现实世界中,确保你混淆了。

您的代码应该如下所示:

private Subject<MyObject3> _subject = new Subject<MyObject3>();

private void SetUpObservable()
{
    _subject = new Subject<MyObject3>();
    _subject.Subscribe(x => Console.WriteLine($"Message acknowledged"));
}

public virtual void MessageHandler(MyObject1 object1, MyObject2 object2)
{
    _subject.OnNext(new MyObject3(object1, object2));
}

现在,如果您仍然想避免使用 Subject,那么您可以这样做:

private Action<MyObject3> _delegate;

private void SetUpObservable()
{
    Observable
        .FromEvent<MyObject3>(h => _delegate += h, h => _delegate -= h)
        .Subscribe(x => Console.WriteLine($"Message acknowledged"));
}

public virtual void MessageHandler(MyObject1 object1, MyObject2 object2)
{
    _delegate?.Invoke(new MyObject3(object1, object2));
}

在我看来,Subject 可以让您更好地控制并且更容易设置。

无论如何,您可能应该保留订阅 IDisposable,以便您可以正确清理。

关于c# - 在方法上创建一个可观察对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50547570/

相关文章:

c# - 与泛型的反/协方差 - 不能分配给 T

c# - 在 BackgroundWorker 忙时禁用表单?

memory - 你能在 Kubernetes 上的 .NET Core 中触发 OOM 异常的自动内存转储吗?

c# - 有没有办法在 .NET Core 中的 CorsPolicy 上使用 'AllowAnyOrigin' 属性?

c# - 使用 Reactive Extensions 制作 Action<T> 的通用调度程序

c# - IObserver -- 当 IObserver.OnError 被调用时,观察者应该做什么

c# - 为 Border 控件的宽度设置动画

c# - 检查字符串是否包含单词

c# - Renci.SshNet:在30000毫秒内建立连接失败

.net - 证明其有用性的.NET Reactive Framework示例