c# - 等待可观察完成

标签 c# .net async-await system.reactive observable

我有一个方法可以使用 observable 异步完成一些工作。我想知道使此方法异步的最佳方法是什么,以便我能够等待并在可观察完成后做一些工作。 p>

我的第一个尝试是在 observable 上使用 await

public async Task DoWorkAsync()
{
    var observable = Observable.Create<int>(o =>
    {
        Task.Run(() =>
        {
            Thread.Sleep(1000);
            Console.WriteLine("OnNext");
            o.OnNext(1);
            o.OnError(new Exception("exception in observable logic"));
            //o.OnCompleted();
        });    
        return Disposable.Empty;
    });

    //observable = observable.Publish().RefCount();

    observable.Subscribe(i => Console.WriteLine(i));
    Console.WriteLine("Awaiting...");
    await observable;
    Console.WriteLine("After Awaiting...");
}

根据具体情况,我对该方法有不同的问题(+/- 表示这部分代码未注释/已注释):

  1. +OnNext +OnCompleted -OnError -RefCount: OnNext 被调用了 2 次(observable 被订阅了 2 次)。这是我想避免的。

  2. +OnNext +OnCompleted -OnError +RefCount:当我使用 RefCount() 方法时,代码有效。

  3. -OnNext +OnCompleted -OnError +RefCount:当我的 observable 没有引发 OnNext 时抛出“序列不包含元素”异常。

  4. +OnNext -OnCompleted +OnError -RefCount: OnNext 被调用了 2 次。引发异常。

  5. +OnNext -OnCompleted +OnError +RefCount:显示 1 后挂起(可能是因为它想返回到等待的线程)。我们可以使用 SubscribeOn(ThreadPoolScheduler.Instance) 使其工作(并引发异常)

无论如何,如果 observable 为空(没有 OnNext 上升),即使 OnError 没有被调用,我们也会得到异常,并且我们在 observable 逻辑中没有任何异常。这就是为什么等待 observable 不是好的解决方案。

这就是为什么我尝试使用 TaskCompletionSource 的另一种解决方案

public async Task DoWorkAsync()
{
    var observable = Observable.Create<int>(o =>
    {
        Task.Run(() =>
        {
            Thread.Sleep(1000);
            Console.WriteLine("OnNext");
            o.OnNext(1);
            o.OnError(new Exception("exception in observable logic"));
            //o.OnCompleted();
        });
        return Disposable.Empty;
    });

    var tcs = new TaskCompletionSource<bool>();

    observable.Subscribe(i => Console.WriteLine(i),
    e =>
    {
        //tcs.TrySetException(e);
        tcs.SetResult(false);
    },
    () => tcs.TrySetResult(true));

    Console.WriteLine("Awaiting...");
    await tcs.Task;
    Console.WriteLine("After Awaiting...");
}

它在所有情况下都可以正常工作,如果 OnError 被调用,我们可以使用 tcs.SetResult(false) 并且在外部方法中没有关于异常详细信息的信息,或者我们可以使用 tcs .TrySetException(e) 并能够在外部方法中捕获异常。

如果有更好/更清洁的解决方案或者我的第二种解决方案是可行的方法,您能建议我吗?

编辑

所以我想知道是否有比我的第二个解决方案更好的解决方案:

  • 不需要使用.Publish().RefCount()
  • 不需要额外的订阅(在 await observable 中发生了什么 - OnNext 被调用了 2 次)
  • 当然,我可以将我的解决方案包装在一些异步扩展方法中,用于订阅返回任务

最佳答案

编辑:

如果您删除订阅,您可以执行以下操作:

await observable.Do(i => Console.WriteLine(i)).LastOrDefaultAsync();

至于你的任意要求......没有多个订阅冷观察是有道理的;所以你发布它。拒绝使用 .Publish().Refcount() 没有意义。我不明白您为什么拒绝解决您问题的解决方案。


那里有很多,但我假设这是您的主要问题:

Anyway in case when observable is empty (no OnNext rised) we get exception even if OnError is not called and we don't have any exception inside observable logic. Thats why awaiting on observable is not good solution.

await observableawait observable.LastAsync() 相同。因此,如果没有元素,则会出现异常。想象一下,将该语句更改为 int result = await observable; 如果没有元素,result 的值应该是多少?

如果您将 await observable; 更改为 await observable.LastOrDefaultAsync(); 一切都会顺利进行。

是的,您应该使用 .Publish().Refcount()

关于c# - 等待可观察完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42231718/

相关文章:

c# - Windows 10 RTM OSVersion 没有返回我所期望的

c# - 如何拥有 IEnumerable<T>.ToObservableCollection()?

.net - SGEN 错误反射(reflect)类型

c# - 为什么ConcurrentQueue提供TryPeek()和TryDequeue(),而Queue提供Peek()和Dequeue()?

c# - 在 Async/Await 中调试异常(调用堆栈)

c# - 将可变参数传递给委托(delegate)

c# - AppFabric DataCache 按键获取值?

c# - 设置组合框的选定值

javascript - 如何在 Gulp 4 中使用异步/等待?

c# - Async/await 作为协程的替代品