c# - 使用 Reactive Extensions 进行数据库轮询

标签 c# .net system.reactive reactive-programming

我必须及时查询数据库以了解遗留系统的状态。我想过将查询包装在 Observable 周围,但我不知道正确的方法。

基本上,每 5 秒就会进行相同的查询。但恐怕我将不得不面对这些问题:

  • 如果查询的执行需要 10 秒怎么办?我不想 如果前一个查询仍在处理中,则执行任何新查询。
  • 另外,应该有一个超时。如果当前查询没有执行 例如,20 秒后,应显示一条信息性消息 记录并应发送新的尝试(相同的查询)。

额外的细节:

  • 查询只是一个 SELECT,它返回一个包含状态代码列表的数据集(workingfaulted)。
  • Observable 序列将始终获取从查询中接收到的最新数据,类似于 Switch 扩展方法。
  • 我想将数据库查询(长时间操作)包装到任务中,但我不确定这是否是最佳选择。

我几乎可以肯定查询应该在另一个线程中执行,但我不知道可观察对象应该是什么样子,读过 Introduction to Rx by Lee Campbell .

最佳答案

这是使用 Rx 轮询另一个系统的相当经典的案例。大多数人会使用 Observable.Interval 作为他们的首选运算符,而且对于大多数人来说这很好。

但是您对超时和重试有特定要求。在这种情况下,我认为您最好结合使用运算符:

  • Observable.Timer 允许您在指定时间执行查询
  • Timeout 识别和数据库查询已溢出
  • ToObservable() 将您的 Task 结果映射到可观察序列。
  • 重试 允许您在超时后恢复
  • Repeat 允许您在成功查询数据库后继续。这也将保持上一个数据库查询完成与下一个数据库查询开始之间的初始时间段/间隙。

这个工作 LINQPad代码片段应向您显示查询正常工作:

void Main()
{
    var pollingPeriod = TimeSpan.FromSeconds(5);
    var dbQueryTimeout = TimeSpan.FromSeconds(10);

    //You will want to have your Rx query timeout after the expected silence of the timer, and then further maximum silence.
    var rxQueryTimeOut = pollingPeriod + dbQueryTimeout;

    var scheduler = new EventLoopScheduler(ts => new Thread(ts) { Name = "DatabasePoller" });

    var query = Observable.Timer(pollingPeriod, scheduler)
                    .SelectMany(_ => DatabaseQuery().ToObservable())
                    .Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
                    .Retry()    //Loop on errors
                    .Repeat();  //Loop on success

    query.StartWith("Seed")
        .TimeInterval(scheduler)    //Just to debug, print the timing gaps.
        .Dump();
}

// Define other methods and classes here
private static int delay = 9;
private static int delayModifier = 1;
public async Task<string> DatabaseQuery()
{
    //Oscillate the delay between 3 and 12 seconds
    delay += delayModifier;
    var timespan = TimeSpan.FromSeconds(delay);
    if (delay < 4 || delay > 11)
        delayModifier *= -1;
    timespan.Dump("delay");
    await Task.Delay(timespan);
    return "Value";
}

结果如下:

Seed 00:00:00.0125407
Timeout 00:00:15.0166379
Timeout 00:00:15.0124480
Timeout 00:00:15.0004520
Timeout 00:00:15.0013296
Timeout 00:00:15.0140864
Value 00:00:14.0251731
Value 00:00:13.0231958
Value 00:00:12.0162236
Value 00:00:11.0138606

样本的关键部分是....

var query = Observable.Timer(TimeSpan.FromSeconds(5), scheduler)
                .SelectMany(_ => DatabaseQuery().ToObservable())
                .Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
                .Retry()    //Loop on errors
                .Repeat();  //Loop on success

编辑: 以下是对如何得出此解决方案的进一步说明。 https://github.com/LeeCampbell/RxCookbook/blob/master/Repository/Polling.md

关于c# - 使用 Reactive Extensions 进行数据库轮询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33714408/

相关文章:

c# - 2 个 IObservable 上的响应式(Reactive) linq 表达式

c# - 使用 .NET Rx 观察随 Action 事件改变的属性

c# - 偏移foreach循环

c# - LINQ left join + default if empty + 匿名类型 + group by

c# - DataGridView 在添加行时抛出 "InvalidOperationException: Operation is not valid..."

.net - 在 Entity Framework 4.1 Code First 中附加缓存的断开连接的实体

c# - 如何使用 LINQ 来匹配单词

c# - List<T> FirstOrDefault() 性能不佳 - 在这种情况下可以使用字典吗?

.net - 依赖问题在大型应用程序中实现用于语义日志记录的 EventSource

c# - 使用响应式(Reactive)扩展来配对请求和响应