c# - 取消订阅后,我如何等待一切都在 Rx 可观察序列中完成?

标签 c# .net asynchronous async-await system.reactive

简介

在我的 WPF C# .NET 应用程序中,我使用响应式扩展 (Rx) 来订阅事件,并且我经常必须从数据库中重新加载一些内容以获取更新 UI 所需的值,因为事件对象通常只包含ID 和一些元数据。

我使用 Rx 调度在后台加载数据并更新调度程序上的 UI。我在 Rx 序列中混合“Task.Run”时遇到了一些不好的经历(当使用“SelectMany”时,顺序不再得到保证,并且很难控制 UnitTests 中的调度)。另请参阅:Executing TPL code in a reactive pipeline and controlling execution via test scheduler

我的问题

如果我关闭我的应用程序(或关闭选项卡),我想取消订阅,然后等待数据库调用(从 Rx“Select”调用),它仍然可以在“subscription.Dispose”之后运行。到目前为止,我还没有找到任何好的实用程序或简单的方法来做到这一点。

问题

是否有任何框架支持等待仍在 Rx 链中运行的一切

如果没有,您对如何制作一个易于使用的实用程序有什么好的想法吗?

是否有任何好的替代方法可以实现相同的目标?

示例

public async Task AwaitEverythingInARxChain()
{
    // In real life this is a hot observable event sequence which never completes
    IObservable<int> eventSource = Enumerable.Range(1, int.MaxValue).ToObservable();

    IDisposable subscription = eventSource
        // Load data in the background
        .ObserveOn(Scheduler.Default)
        .Select(id => LoadFromDatabase(id))

        // Update UI on the dispatcher
        .ObserveOn(DispatcherScheduler.Current)
        .SubscribeOn(Scheduler.Default) // In real life the source produces the event values on a background thread.
        .Subscribe(loadedData => UpdateUi(loadedData));

    Thread.Sleep(TimeSpan.FromSeconds(10));
// In real life I want to cancel (unsubscribe) here because the user has closed the Application or closed the tab and return a task which completes when everything is done.

    // Unsubscribe just guarantees that no "OnNext" is called anymore, but it doesn't wait until all operations in the sequence are finished (for example "LoadFromDatabase(id)" can still be runnig here.
    subscription.Dispose();

    await ?; // I need to await here, so that i can be sure that no "LoadFromDatabase(id)" is running anymore.

    ShutDownDatabase();
}

我已经尝试过(但没有奏效)

  • 使用“Finally”运算符设置 TaskCompletionSource 的结果。 这种方法的问题:最终在取消订阅后直接调用并且“LoadFromDatabase”仍然可以运行

更新:带有控制台输出和 TakeUntil 的示例

public async Task Main()
{
    Observable
        .Timer(TimeSpan.FromSeconds(5.0))
        .Subscribe(x =>
        {
            Console.WriteLine("Cancel started");
            _shuttingDown.OnNext(Unit.Default);
        });

    await AwaitEverythingInARxChain();
    Console.WriteLine("Cancel finished");
    ShutDownDatabase();
    Thread.Sleep(TimeSpan.FromSeconds(3));
}

private Subject<Unit> _shuttingDown = new Subject<Unit>();

public async Task AwaitEverythingInARxChain()
{
    IObservable<int> eventSource = Observable.Range(0, 10);

    await eventSource
        .ObserveOn(Scheduler.Default)
        .Select(id => LoadFromDatabase(id))
        .ObserveOn(Scheduler.Default)
        .TakeUntil(_shuttingDown)
        .Do(loadedData => UpdateUi(loadedData));
}

public int LoadFromDatabase(int x)
{
    Console.WriteLine("Start LoadFromDatabase: " + x);
    Thread.Sleep(1000);
    Console.WriteLine("Finished LoadFromDatabase: " + x);

    return x;
}

public void UpdateUi(int x)
{
    Console.WriteLine("UpdateUi: " + x);
}

public void ShutDownDatabase()
{
    Console.WriteLine("ShutDownDatabase");
}

输出(实际):

Start LoadFromDatabase: 0
Finished LoadFromDatabase: 0
Start LoadFromDatabase: 1
UpdateUi: 0
Finished LoadFromDatabase: 1
Start LoadFromDatabase: 2
UpdateUi: 1
Finished LoadFromDatabase: 2
Start LoadFromDatabase: 3
UpdateUi: 2
Finished LoadFromDatabase: 3
Start LoadFromDatabase: 4
UpdateUi: 3
Cancel started
Cancel finished
ShutDownDatabase
Finished LoadFromDatabase: 4
Start LoadFromDatabase: 5
Finished LoadFromDatabase: 5
Start LoadFromDatabase: 6
Finished LoadFromDatabase: 6
Start LoadFromDatabase: 7

预期: 我想保证以下是最后的输出:

Cancel finished
ShutDownDatabase

最佳答案

这比您想象的要容易。您可以 await 可观察对象。所以只需这样做:

public async Task AwaitEverythingInARxChain()
{
    IObservable<int> eventSource = Enumerable.Range(1, 10).ToObservable();

    await eventSource
        .ObserveOn(Scheduler.Default)
        .Select(id => LoadFromDatabase(id))
        .ObserveOn(DispatcherScheduler.Current)
        .Do(loadedData => UpdateUi(loadedData), () => ShutDownDatabase());
}

在您的方法中使用一些 Console.WriteLine 操作,并在 db 调用中休眠一个小线程以模拟网络延迟,我得到以下输出:

LoadFromDatabase: 1
LoadFromDatabase: 2
UpdateUi: 1
LoadFromDatabase: 3
UpdateUi: 2
LoadFromDatabase: 4
UpdateUi: 3
LoadFromDatabase: 5
UpdateUi: 4
LoadFromDatabase: 6
UpdateUi: 5
LoadFromDatabase: 7
UpdateUi: 6
LoadFromDatabase: 8
UpdateUi: 7
LoadFromDatabase: 9
UpdateUi: 8
LoadFromDatabase: 10
UpdateUi: 9
UpdateUi: 10
ShutDownDatabase

If you need to end the query, just create a shuttingDown subject:

private Subject<Unit> _shuttingDown = new Subject<Unit>();

...然后像这样修改查询:

    await eventSource
        .ObserveOn(Scheduler.Default)
        .Select(id => LoadFromDatabase(id))
        .ObserveOn(DispatcherScheduler.Current)
        .Do(
            loadedData => UpdateUi(loadedData),
            () => ShutDownDatabase())
        .TakeUntil(_shuttingDown);

您只需发出 _shuttingDown.OnNext(Unit.Default); 即可取消订阅可观察对象。


这是我完整的工作测试代码:

async Task Main()
{
    Observable
        .Timer(TimeSpan.FromSeconds(5.0))
        .Subscribe(x => _shuttingDown.OnNext(Unit.Default));

    await AwaitEverythingInARxChain();
}

private Subject<Unit> _shuttingDown = new Subject<Unit>();

public async Task AwaitEverythingInARxChain()
{
    IObservable<int> eventSource = Observable.Range(0, 10);

    await eventSource
        .ObserveOn(Scheduler.Default)
        .Select(id => LoadFromDatabase(id))
        .ObserveOn(DispatcherScheduler.Current)
        .Finally(() => ShutDownDatabase())
        .TakeUntil(_shuttingDown)
        .Do(loadedData => UpdateUi(loadedData));
}

public int LoadFromDatabase(int x)
{
    Console.WriteLine("LoadFromDatabase: " + x);
    Thread.Sleep(1000);
    return x;
}

public void UpdateUi(int x)
{
    Console.WriteLine("UpdateUi: " + x);
}

public void ShutDownDatabase()
{
    Console.WriteLine("ShutDownDatabase");
}

我得到这个输出:

LoadFromDatabase: 0
LoadFromDatabase: 1
UpdateUi: 0
LoadFromDatabase: 2
UpdateUi: 1
LoadFromDatabase: 3
UpdateUi: 2
LoadFromDatabase: 4
UpdateUi: 3
LoadFromDatabase: 5
UpdateUi: 4
ShutDownDatabase

请注意,可观察对象试图在 10 秒内生成 10 个值,但它被 OnNext 打断了。

关于c# - 取消订阅后,我如何等待一切都在 Rx 可观察序列中完成?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42655958/

相关文章:

c# - 使用 HTTPS 的 WCF 的最小示例

c# - 如何获取UWP中Content属性的名称?

javascript - 使用异步库组装数据库中的所有标签

node.js - 如何递归地遍历目录,通过 node.js 中的套接字发送所有文件名?

c# - 序列化 System.Globalization.CultureInfo 类型的对象时检测到循环引用

c# - 将 FormView 中的列表绑定(bind)到 ASP.net webforms 中的模型

c# - 遍历excel表中特定列号的所有行并对每一行值做一些处理

.net - Windows Workflow Foundation 或 IoC 容器 + 依赖注入(inject)?

c# - 设备响应式 CSS

javascript - async/await 比 promises 慢吗?