简介
在我的 WPF C# .NET 应用程序中,我使用响应式扩展 (Rx) 来订阅事件,并且我经常必须从数据库中重新加载一些内容以获取更新 UI 所需的值,因为事件对象通常只包含ID 和一些元数据。
我使用 Rx 调度在后台加载数据并更新调度程序上的 UI。我在 Rx 序列中混合“Task.Run”时遇到了一些不好的经历(当使用“SelectMany”时,顺序不再得到保证,并且很难控制 UnitTests 中的调度)。另请参阅:Executing TPL code in a reactive pipeline and controlling execution via test scheduler
我的问题
如果我关闭我的应用程序(或关闭选项卡),我想取消订阅,然后等待数据库调用(从 Rx“Select”调用),它仍然可以在“subscription.Dispose”之后运行。到目前为止,我还没有找到任何好的实用程序或简单的方法来做到这一点。
问题
是否有任何框架支持等待仍在 Rx 链中运行的一切?
如果没有,您对如何制作一个易于使用的实用程序有什么好的想法吗?
是否有任何好的替代方法可以实现相同的目标?
示例
public async Task AwaitEverythingInARxChain()
{
// In real life this is a hot observable event sequence which never completes
IObservable<int> eventSource = Enumerable.Range(1, int.MaxValue).ToObservable();
IDisposable subscription = eventSource
// Load data in the background
.ObserveOn(Scheduler.Default)
.Select(id => LoadFromDatabase(id))
// Update UI on the dispatcher
.ObserveOn(DispatcherScheduler.Current)
.SubscribeOn(Scheduler.Default) // In real life the source produces the event values on a background thread.
.Subscribe(loadedData => UpdateUi(loadedData));
Thread.Sleep(TimeSpan.FromSeconds(10));
// In real life I want to cancel (unsubscribe) here because the user has closed the Application or closed the tab and return a task which completes when everything is done.
// Unsubscribe just guarantees that no "OnNext" is called anymore, but it doesn't wait until all operations in the sequence are finished (for example "LoadFromDatabase(id)" can still be runnig here.
subscription.Dispose();
await ?; // I need to await here, so that i can be sure that no "LoadFromDatabase(id)" is running anymore.
ShutDownDatabase();
}
我已经尝试过(但没有奏效)
- 使用“Finally”运算符设置 TaskCompletionSource 的结果。 这种方法的问题:最终在取消订阅后直接调用并且“LoadFromDatabase”仍然可以运行
更新:带有控制台输出和 TakeUntil 的示例
public async Task Main()
{
Observable
.Timer(TimeSpan.FromSeconds(5.0))
.Subscribe(x =>
{
Console.WriteLine("Cancel started");
_shuttingDown.OnNext(Unit.Default);
});
await AwaitEverythingInARxChain();
Console.WriteLine("Cancel finished");
ShutDownDatabase();
Thread.Sleep(TimeSpan.FromSeconds(3));
}
private Subject<Unit> _shuttingDown = new Subject<Unit>();
public async Task AwaitEverythingInARxChain()
{
IObservable<int> eventSource = Observable.Range(0, 10);
await eventSource
.ObserveOn(Scheduler.Default)
.Select(id => LoadFromDatabase(id))
.ObserveOn(Scheduler.Default)
.TakeUntil(_shuttingDown)
.Do(loadedData => UpdateUi(loadedData));
}
public int LoadFromDatabase(int x)
{
Console.WriteLine("Start LoadFromDatabase: " + x);
Thread.Sleep(1000);
Console.WriteLine("Finished LoadFromDatabase: " + x);
return x;
}
public void UpdateUi(int x)
{
Console.WriteLine("UpdateUi: " + x);
}
public void ShutDownDatabase()
{
Console.WriteLine("ShutDownDatabase");
}
输出(实际):
Start LoadFromDatabase: 0
Finished LoadFromDatabase: 0
Start LoadFromDatabase: 1
UpdateUi: 0
Finished LoadFromDatabase: 1
Start LoadFromDatabase: 2
UpdateUi: 1
Finished LoadFromDatabase: 2
Start LoadFromDatabase: 3
UpdateUi: 2
Finished LoadFromDatabase: 3
Start LoadFromDatabase: 4
UpdateUi: 3
Cancel started
Cancel finished
ShutDownDatabase
Finished LoadFromDatabase: 4
Start LoadFromDatabase: 5
Finished LoadFromDatabase: 5
Start LoadFromDatabase: 6
Finished LoadFromDatabase: 6
Start LoadFromDatabase: 7
预期: 我想保证以下是最后的输出:
Cancel finished
ShutDownDatabase
最佳答案
这比您想象的要容易。您可以 await
可观察对象。所以只需这样做:
public async Task AwaitEverythingInARxChain()
{
IObservable<int> eventSource = Enumerable.Range(1, 10).ToObservable();
await eventSource
.ObserveOn(Scheduler.Default)
.Select(id => LoadFromDatabase(id))
.ObserveOn(DispatcherScheduler.Current)
.Do(loadedData => UpdateUi(loadedData), () => ShutDownDatabase());
}
在您的方法中使用一些 Console.WriteLine
操作,并在 db 调用中休眠一个小线程以模拟网络延迟,我得到以下输出:
LoadFromDatabase: 1 LoadFromDatabase: 2 UpdateUi: 1 LoadFromDatabase: 3 UpdateUi: 2 LoadFromDatabase: 4 UpdateUi: 3 LoadFromDatabase: 5 UpdateUi: 4 LoadFromDatabase: 6 UpdateUi: 5 LoadFromDatabase: 7 UpdateUi: 6 LoadFromDatabase: 8 UpdateUi: 7 LoadFromDatabase: 9 UpdateUi: 8 LoadFromDatabase: 10 UpdateUi: 9 UpdateUi: 10 ShutDownDatabase
If you need to end the query, just create a shuttingDown
subject:
private Subject<Unit> _shuttingDown = new Subject<Unit>();
...然后像这样修改查询:
await eventSource
.ObserveOn(Scheduler.Default)
.Select(id => LoadFromDatabase(id))
.ObserveOn(DispatcherScheduler.Current)
.Do(
loadedData => UpdateUi(loadedData),
() => ShutDownDatabase())
.TakeUntil(_shuttingDown);
您只需发出 _shuttingDown.OnNext(Unit.Default);
即可取消订阅可观察对象。
这是我完整的工作测试代码:
async Task Main()
{
Observable
.Timer(TimeSpan.FromSeconds(5.0))
.Subscribe(x => _shuttingDown.OnNext(Unit.Default));
await AwaitEverythingInARxChain();
}
private Subject<Unit> _shuttingDown = new Subject<Unit>();
public async Task AwaitEverythingInARxChain()
{
IObservable<int> eventSource = Observable.Range(0, 10);
await eventSource
.ObserveOn(Scheduler.Default)
.Select(id => LoadFromDatabase(id))
.ObserveOn(DispatcherScheduler.Current)
.Finally(() => ShutDownDatabase())
.TakeUntil(_shuttingDown)
.Do(loadedData => UpdateUi(loadedData));
}
public int LoadFromDatabase(int x)
{
Console.WriteLine("LoadFromDatabase: " + x);
Thread.Sleep(1000);
return x;
}
public void UpdateUi(int x)
{
Console.WriteLine("UpdateUi: " + x);
}
public void ShutDownDatabase()
{
Console.WriteLine("ShutDownDatabase");
}
我得到这个输出:
LoadFromDatabase: 0 LoadFromDatabase: 1 UpdateUi: 0 LoadFromDatabase: 2 UpdateUi: 1 LoadFromDatabase: 3 UpdateUi: 2 LoadFromDatabase: 4 UpdateUi: 3 LoadFromDatabase: 5 UpdateUi: 4 ShutDownDatabase
请注意,可观察对象试图在 10 秒内生成 10 个值,但它被 OnNext
打断了。
关于c# - 取消订阅后,我如何等待一切都在 Rx 可观察序列中完成?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42655958/