c# - 在创建未知数量的可观察量时将其组合/合并在一起

标签 c# .net system.reactive

我想做的是:

  • 调用一个函数 (DoWork),作为其工作的一部分,该函数将通过多个 Worker 类订阅多个热输入
  • 在调用函数之前订阅 DoWork 订阅的所有更新
  • 完成后,处理所有订阅
  • DoWork 完成之前,很可能至少会触发一个传入事件。

问题:

  • Subject 是执行此操作的正确方法吗?感觉应该有更好的办法?
  • 如何确保一旦 Main 中的订阅被释放,所有 incomingX 订阅也被释放 - 即 Main应该控制所有订阅的生命周期。

    void Main()
    {
        var worker = new Worker();
        using (worker.UpdateEvents.Subscribe(x => Console.WriteLine()))
        {
            worker.DoWork();
        }
    }
    
    public class Worker1
    {
        private readonly Subject<string> updateEvents = new Subject<string>();
    
        public IObservable<string> UpdateEvents { get { return updateEvents; } }
    
        public void DoWork()
        {
            // Do some work
            // subscribe to a hot observable (events coming in over the network)
            incoming1.Subscribe(updateEvents);
    
            var worker2 = new Worker2();
            worker2.UpdateEvents.Subscribe(updateEvents);
            worker2.DoWork();
        }
    }
    
    public class Worker2
    {
        private readonly Subject<string> updateEvents = new Subject<string>();
    
        public IObservable<string> UpdateEvents { get { return updateEvents; } }
    
        public void DoWork()
        {
            // Do some work
            // subscribe to some more events
            incoming2.Subscribe(updateEvents);
    
            var workerN = new WorkerN();
            workerN.UpdateEvents.Subscribe(updateEvents);
            workerN.DoWork();
        }
    }
    

最佳答案

James 的回答(使用 SubjectMerge)捕获了问题的本质。这个答案提供了一种模式,我发现在这种情况下很有用(基于您对詹姆斯答案的评论)。

本质上,该模式是让您的工作线程公开一个 IObservable,调用者将在调用 DoWork 之前订阅该IObservable。但这种 API(在调用 B 之前调用 A)是有问题的,因为它引入了时间耦合。

为了消除时间耦合,您最终将工作线程本身转变为一个冷的 Observable,当调用者订阅时,它会隐式调用 DoWork。一旦您认识到冷可观察量的强大功能以及在观察者订阅时使用 Observable.Create 采取行动的能力,您就可以创建 Rx 链,而无需达到 主题。这是一个基于您的原始代码的示例。

Worker 很简单。它仅订阅 incoming1Worker2Worker2 稍微复杂一些。它订阅 incoming2,执行一些额外的工作,然后最终订阅 WorkerN

始终保持正确的 OnErrorOnCompleted 逻辑,而您的原始代码示例无法做到这一点。这意味着 Main 看到的可观察流在所有传入流和工作流完成之前不会完成。但是,只要任何传入流或工作流失败,Main 就会失败。您的代码示例多次调用 Subscribe(someSubject) 将导致 Subject 在任何传入的 流完成。

public class Worker1
{
    public IObservable<string> UpdateEvents { get; private set; };

    public Worker1()
    {
        // Each time someone subscribes, create a new worker2 and subscribe to the hot events as well as whatever worker2 produces.
        UpdateEvents = Observable.Create(observer =>
        {
            var worker2 = new Worker2();
            return incoming1.Merge(worker2.UpdateEvents).Subscribe(observer);
        });
    }
}

public class Worker2
{
    public IObservable<string> UpdateEvents { get; private set; };

    public Worker2()
    {
        // Each time someone subscribes, create a new worker and subscribe to the hot events as well as whatever worker2 produces.
        UpdateEvents = Observable.Create(observer =>
        {
            // maybe this version needs to do some stuff after it has subscribed to incoming2 but before it subscribes to workerN:
            var doWorkThenSubscribeToWorker = Observable.Create(o =>
            {
                DoWork(o);
                var worker = new WorkerN();
                return worker.UpdateEvents.Subscribe(o);
            }

            return incoming2.Merge(doWorkThenSubscribeToWorker).Subscribe(observer);
        });
    }

    private void DoWork(IObserver<string> observer)
    {
        // do some work
        observer.OnNext("result of work");
    }
}


void Main()
{
    var worker = new Worker();
    worker.UpdateEvents.Do(x => Console.WriteLine()).Wait();
}

关于c# - 在创建未知数量的可观察量时将其组合/合并在一起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26335846/

相关文章:

C# 文件移动(重命名) 多线程 Windows 操作系统

c# - 图片不在 Chrome 和 FF 上显示

c# - 将 MahApps.Metro 主题和重音应用于其他控件或矩形

.net - 使用 DataGridView 中的 DataGridViewComboBoxColumn 将绑定(bind)字段的值设置为 NULL

c# - Observable.Interval 等待 Action 完成

c# - DataSet 主键列是否已编入索引?

c# - Blazor:降低/消除图表的动画速度

.net - Debug.Fail和Debug.Assert

c# - 如何获取 IObservable<IObservable<T>> 的最新变化事件?

multithreading - F# Rx 扩展 IObservable 并发事件