我想做的是:
- 调用一个函数 (
DoWork
),作为其工作的一部分,该函数将通过多个Worker
类订阅多个热输入 - 在调用函数之前订阅
DoWork
订阅的所有更新 - 完成后,处理所有订阅
- 在
DoWork
完成之前,很可能至少会触发一个传入事件。
问题:
Subject
是执行此操作的正确方法吗?感觉应该有更好的办法?如何确保一旦
Main
中的订阅被释放,所有incomingX
订阅也被释放 - 即Main
应该控制所有订阅的生命周期。void Main() { var worker = new Worker(); using (worker.UpdateEvents.Subscribe(x => Console.WriteLine())) { worker.DoWork(); } } public class Worker1 { private readonly Subject<string> updateEvents = new Subject<string>(); public IObservable<string> UpdateEvents { get { return updateEvents; } } public void DoWork() { // Do some work // subscribe to a hot observable (events coming in over the network) incoming1.Subscribe(updateEvents); var worker2 = new Worker2(); worker2.UpdateEvents.Subscribe(updateEvents); worker2.DoWork(); } } public class Worker2 { private readonly Subject<string> updateEvents = new Subject<string>(); public IObservable<string> UpdateEvents { get { return updateEvents; } } public void DoWork() { // Do some work // subscribe to some more events incoming2.Subscribe(updateEvents); var workerN = new WorkerN(); workerN.UpdateEvents.Subscribe(updateEvents); workerN.DoWork(); } }
最佳答案
James 的回答(使用 Subject
和 Merge
)捕获了问题的本质。这个答案提供了一种模式,我发现在这种情况下很有用(基于您对詹姆斯答案的评论)。
本质上,该模式是让您的工作线程公开一个 IObservable
,调用者将在调用 DoWork
之前订阅该IObservable
。但这种 API(在调用 B 之前调用 A)是有问题的,因为它引入了时间耦合。
为了消除时间耦合,您最终将工作线程本身转变为一个冷的 Observable,当调用者订阅时,它会隐式调用 DoWork
。一旦您认识到冷可观察量的强大功能以及在观察者订阅时使用 Observable.Create
采取行动的能力,您就可以创建 Rx 链,而无需达到 主题
。这是一个基于您的原始代码的示例。
Worker
很简单。它仅订阅 incoming1
和 Worker2
。
Worker2
稍微复杂一些。它订阅 incoming2
,执行一些额外的工作,然后最终订阅 WorkerN
。
始终保持正确的 OnError
、OnCompleted
逻辑,而您的原始代码示例无法做到这一点。这意味着 Main
看到的可观察流在所有传入流和工作流完成之前不会完成
。但是,只要任何传入流或工作流失败,Main
就会失败。您的代码示例多次调用 Subscribe(someSubject)
将导致 Subject
在任何传入的
流完成。
public class Worker1
{
public IObservable<string> UpdateEvents { get; private set; };
public Worker1()
{
// Each time someone subscribes, create a new worker2 and subscribe to the hot events as well as whatever worker2 produces.
UpdateEvents = Observable.Create(observer =>
{
var worker2 = new Worker2();
return incoming1.Merge(worker2.UpdateEvents).Subscribe(observer);
});
}
}
public class Worker2
{
public IObservable<string> UpdateEvents { get; private set; };
public Worker2()
{
// Each time someone subscribes, create a new worker and subscribe to the hot events as well as whatever worker2 produces.
UpdateEvents = Observable.Create(observer =>
{
// maybe this version needs to do some stuff after it has subscribed to incoming2 but before it subscribes to workerN:
var doWorkThenSubscribeToWorker = Observable.Create(o =>
{
DoWork(o);
var worker = new WorkerN();
return worker.UpdateEvents.Subscribe(o);
}
return incoming2.Merge(doWorkThenSubscribeToWorker).Subscribe(observer);
});
}
private void DoWork(IObserver<string> observer)
{
// do some work
observer.OnNext("result of work");
}
}
void Main()
{
var worker = new Worker();
worker.UpdateEvents.Do(x => Console.WriteLine()).Wait();
}
关于c# - 在创建未知数量的可观察量时将其组合/合并在一起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26335846/