c# - 写一个Rx "RetryAfter"扩展方法

标签 c# system.reactive

在书中IntroToRx作者建议为 I/O 编写一个“智能”重试,它会在一段时间后重试 I/O 请求,例如网络请求。

这是确切的段落:

A useful extension method to add to your own library might be a "Back Off and Retry" method. The teams I have worked with have found such a feature useful when performing I/O, especially network requests. The concept is to try, and on failure wait for a given period of time and then try again. Your version of this method may take into account the type of Exception you want to retry on, as well as the maximum number of times to retry. You may even want to lengthen the to wait period to be less aggressive on each subsequent retry.

不幸的是,我不知道如何编写这个方法。 :(

最佳答案

此回退重试实现的关键是 deferred observables .延迟的可观察对象在有人订阅它之前不会执行它的工厂。它将为每个订阅调用工厂,使其成为我们重试场景的理想选择。

假设我们有一个触发网络请求的方法。

public IObservable<WebResponse> SomeApiMethod() { ... }

为了这个小片段的目的,让我们将 deferred 定义为 source

var source = Observable.Defer(() => SomeApiMethod());

每当有人订阅源时,它都会调用 SomeApiMethod 并启动新的网络请求。每当它失败时重试它的天真的方法是使用内置的 Retry 运算符。

source.Retry(4)

虽然这对 API 来说不是很好,但这不是您所要求的。我们需要在每次尝试之间延迟请求的启动。一种方法是使用 delayed subscription .

Observable.Defer(() => source.DelaySubscription(TimeSpan.FromSeconds(1))).Retry(4)

这并不理想,因为它会在第一次请求时增加延迟,让我们解决这个问题。

int attempt = 0;
Observable.Defer(() => { 
   return ((++attempt == 1)  ? source : source.DelaySubscription(TimeSpan.FromSeconds(1)))
})
.Retry(4)
.Select(response => ...)

只是暂停一秒钟并不是一个很好的重试方法,所以让我们将该常量更改为一个接收重试计数并返回适当延迟的函数。指数退避很容易实现。

Func<int, TimeSpan> strategy = n => TimeSpan.FromSeconds(Math.Pow(n, 2));

((++attempt == 1)  ? source : source.DelaySubscription(strategy(attempt - 1)))

我们现在差不多完成了,我们只需要添加一种方法来指定我们应该重试哪些异常。让我们添加一个函数,在给定异常的情况下返回重试是否有意义,我们将其称为 retryOnError。

现在我们需要编写一些看起来很可怕的代码,但请耐心等待。

Observable.Defer(() => {
    return ((++attempt == 1)  ? source : source.DelaySubscription(strategy(attempt - 1)))
        .Select(item => new Tuple<bool, WebResponse, Exception>(true, item, null))
        .Catch<Tuple<bool, WebResponse, Exception>, Exception>(e => retryOnError(e)
            ? Observable.Throw<Tuple<bool, WebResponse, Exception>>(e)
            : Observable.Return(new Tuple<bool, WebResponse, Exception>(false, null, e)));
})
.Retry(retryCount)
.SelectMany(t => t.Item1
    ? Observable.Return(t.Item2)
    : Observable.Throw<T>(t.Item3))

所有这些尖括号都用于编码一个我们不应该重试超过 .Retry() 的异常。 .我们将内部可观察对象设为 IObservable<Tuple<bool, WebResponse, Exception>>其中第一个 bool 表示我们是否有响应或异常。如果 retryOnError 表明我们应该重试一个特定的异常,内部可观察对象将抛出并且将被重试拾取。 SelectMany 只是解开我们的元组并使结果可观察到 IObservable<WebRequest>再次。

查看我的 gist with full source and tests对于最终版本。有了这个运算符,我们就可以非常简洁地编写重试代码

Observable.Defer(() => SomApiMethod())
  .RetryWithBackoffStrategy(
     retryCount: 4, 
     retryOnError: e => e is ApiRetryWebException
  )

关于c# - 写一个Rx "RetryAfter"扩展方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18978523/

相关文章:

c# - IEnumerable 集合列表

c# - 将最新值与可观察流中的先前值相结合

c# - Rx DelayOnTrue 扩展实现

c# - Rx,动态合并源

c# - 如何为异步等待调用创建包装器?

c# - 如何防止提交后清除输入的密码?

c# - 为什么 DateTime.Now.ToBinary() 返回与构造函数创建时不同的值

C# 2.0 解析 Excel 电子表格的最快方法

c# - 处理除集合之外的绑定(bind)事件,以及在删除C#时取消绑定(bind)的事件-多线程

system.reactive - 如何在 Rx 中使用新的 BufferWithTimeOrCount 返回 IObservable<IObservable<T>> 而不是 IObservable<IList<T>>