当调用onNext并且订阅者在另一个线程/任务上时,如何等待订阅者方法完成?我想念诸如await onNext()之类的东西对于主题...
例如,在同一线程上运行时:
[Test, Explicit]
public void TestSyncOnSameTask()
{
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
using (var subject = new Subject<int>())
using (subject
.Subscribe(
o => Console.WriteLine("Received {1} on threadId:{0}",
Thread.CurrentThread.ManagedThreadId,
o),
() => Console.WriteLine("OnCompleted on threadId:{0}",
Thread.CurrentThread.ManagedThreadId)))
{
subject.OnNext(3);
subject.OnNext(2);
subject.OnNext(1);
subject.OnCompleted();
}
}
将打印输出:
Starting on threadId:10
Received 3 on threadId:10
Received 2 on threadId:10
Received 1 on threadId:10
OnCompleted on threadId:10
在单独的线程上运行时(例如,卸载UI线程)
[Test, Explicit]
public void TestObserveOnSeparateTask()
{
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
using (var subject = new Subject<int>())
using (subject.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(
o => { Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
Task.Delay(1000 * o);
},
() => Console.WriteLine("OnCompleted on threadId:{0}",
Thread.CurrentThread.ManagedThreadId)))
{
subject.OnNext(3);
subject.OnNext(2);
subject.OnNext(1);
subject.OnCompleted();
}
}
将显示以下结果作为输出:
Starting on threadId:10
Received 3 on threadId:11
如您所见...留下了using块,这是因为主题调用OnNext()立即返回,并且不等待订户Task的完成。
如何“等待”订户调用?订单得到保证吗?我没有找到有关Synchronize()方法以及该方法真正同步内容的良好文档。
我在寻找类似的东西:
await subject.OnNext(3);
await subject.OnNext(2);
在“老派方式”中,我使用BlockingCollection作为队列,
public async Task<IResponse> Request(IRequest request)
{
var taskCompletionSource = new TaskCompletionSource<IResponse>();
Task<IResponse> tsk = taskCompletionSource.Task;
Action<IResponse> responseHandler = taskCompletionSource.SetResult;
this.requestProcessor.Process(request, responseHandler);
return await tsk.ConfigureAwait(false);
}
而requestprocessor就像这样:
while (true)
{
//blockingCollection is shared between caller and runner
blockingCollection.TryTake(out request, Timeout.Infinite);
// call callback when finished to return to awaited method
}
现在,我想改用RX/Observables,但是这个卸载话题让我大吃一惊...它更多地是关于使用Reactive Extensions发布/订阅,但我认为我在这里需要诸如request/response之类的东西。用RX可以做到这一点吗?
最佳答案
要完成您想完成的任务,可以执行以下操作:
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
var are = new AutoResetEvent(false);
using (var subject = new Subject<int>())
{
using (
subject
.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(o =>
{
Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
Task.Delay(1000 * o);
}, () =>
{
Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
are.Set();
}))
{
subject.OnNext(3);
subject.OnNext(2);
subject.OnNext(1);
subject.OnCompleted();
are.WaitOne();
}
}
Console.WriteLine("Done.");
关于c# - 如何等待已结束的IObserver调用,包括观察订户调用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37988412/