c# - 如何等待已结束的IObserver调用,包括观察订户调用?

标签 c# .net multithreading system.reactive

当调用onNext并且订阅者在另一个线程/任务上时,如何等待订阅者方法完成?我想念诸如await onNext()之类的东西对于主题...

例如,在同一线程上运行时:

    [Test, Explicit]
    public void TestSyncOnSameTask()
    {
        Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
        using (var subject = new Subject<int>())
        using (subject
            .Subscribe(
                o => Console.WriteLine("Received {1} on threadId:{0}",
                    Thread.CurrentThread.ManagedThreadId,
                    o),
                () => Console.WriteLine("OnCompleted on threadId:{0}",
                    Thread.CurrentThread.ManagedThreadId)))
        {
            subject.OnNext(3);
            subject.OnNext(2);
            subject.OnNext(1);
            subject.OnCompleted();
        }
    }

将打印输出:
Starting on threadId:10
Received 3 on threadId:10
Received 2 on threadId:10
Received 1 on threadId:10
OnCompleted on threadId:10

在单独的线程上运行时(例如,卸载UI线程)
    [Test, Explicit]
    public void TestObserveOnSeparateTask()
    {
        Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
        using (var subject = new Subject<int>())
        using (subject.ObserveOn(TaskPoolScheduler.Default)
            .Subscribe(
                o => { Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
                         Task.Delay(1000 * o);
                },
                () => Console.WriteLine("OnCompleted on threadId:{0}",
                    Thread.CurrentThread.ManagedThreadId)))
        {
            subject.OnNext(3);
            subject.OnNext(2);
            subject.OnNext(1);
            subject.OnCompleted();
        }
    }

将显示以下结果作为输出:
Starting on threadId:10
Received 3 on threadId:11

如您所见...留下了using块,这是因为主题调用OnNext()立即返回,并且不等待订户Task的完成。

如何“等待”订户调用?订单得到保证吗?我没有找到有关Synchronize()方法以及该方法真正同步内容的良好文档。

我在寻找类似的东西:
await subject.OnNext(3);
await subject.OnNext(2);

在“老派方式”中,我使用BlockingCollection作为队列,
    public async Task<IResponse> Request(IRequest request)
    {
        var taskCompletionSource = new TaskCompletionSource<IResponse>();
        Task<IResponse> tsk = taskCompletionSource.Task;
        Action<IResponse> responseHandler = taskCompletionSource.SetResult; 
        this.requestProcessor.Process(request, responseHandler);
        return await tsk.ConfigureAwait(false);
    }

而requestprocessor就像这样:
        while (true)
        {
            //blockingCollection is shared between caller and runner
            blockingCollection.TryTake(out request, Timeout.Infinite);
            // call callback when finished to return to awaited method
        }

现在,我想改用RX/Observables,但是这个卸载话题让我大吃一惊...它更多地是关于使用Reactive Extensions发布/订阅,但我认为我在这里需要诸如request/response之类的东西。用RX可以做到这一点吗?

最佳答案

要完成您想完成的任务,可以执行以下操作:

Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
var are = new AutoResetEvent(false);
using (var subject = new Subject<int>())
{
    using (
        subject
            .ObserveOn(TaskPoolScheduler.Default)
            .Subscribe(o =>
            {
                Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
                Task.Delay(1000 * o);
            }, () =>
            {
                Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
                are.Set();
            }))
        {
            subject.OnNext(3);
            subject.OnNext(2);
            subject.OnNext(1);
            subject.OnCompleted();
            are.WaitOne();
        }
}
Console.WriteLine("Done.");

关于c# - 如何等待已结束的IObserver调用,包括观察订户调用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37988412/

相关文章:

c# - "Go To Definition"接口(interface)实现的等效快捷方式?

c# - 如何获得复杂的枚举值字符串表示

c# - 如何调试打印第三方可执行文件的流程

c# - 从 C++ 调用 C# dll 时如何使用 app.config 文件

C++ 套接字接收混合消息

编译csapp.c和csapp.h

C# 多重赋值

c# - 使用 ServiceStack 的 Swagger UI 传递 header

c# - 实现没有 CA 警告的继承 IDisposable 模式

c# - 为什么 lock(this) {...} 不好?