c# - ITargetBlock<TInput> 中的重试策略

标签 c# task-parallel-library tpl-dataflow

我需要在工作流中引入重试策略。假设有 3 个 block 以这种方式连接:

var executionOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 };
var buffer = new BufferBlock<int>();
var processing = new TransformBlock<int, int>(..., executionOptions);
var send = new ActionBlock<int>(...);

buffer.LinkTo(processing);
processing.LinkTo(send);

所以有一个缓冲区来累积数据,然后将其发送到一次处理不超过 3 个项目的转换 block ,然后将结果发送到操作 block 。

在处理转换 block 期间可能会出现 transient 错误,如果错误多次出现 transient ,我想重试该 block 。

我知道 block 通常是不可重试的(传递到 block 中的委托(delegate)可以重试)。一种选择是包装传递的委托(delegate)以支持重试。

我也知道有一个很好的图书馆 TransientFaultHandling.Core 为 transient 故障提供重试机制。这是一个很棒的图书馆,但对我来说不是。如果我将传递给转换 block 的委托(delegate)包装到 RetryPolicy.ExecuteAsync方法,转换 block 内的消息将被锁定,直到重试完成或失败,转换 block 将无法接收新消息。想象一下,如果所有 3 条消息都进入重试(假设下一次重试将在 2 分钟内)并且失败,则转换 block 将被卡住,直到至少有一条消息离开转换 block 。

我看到的唯一解决方案是扩展 TranformBlock (实际上,ITargetBlock 也足够了),然后手动重试(如 here ):
do
 {
    try { return await transform(input); }
    catch
    { 
        if( numRetries <= 0 ) throw;
        else Task.Delay(timeout).ContinueWith(t => processing.Post(message));
    }
 } while( numRetries-- > 0 );

例如将消息再次延迟放入转换 block 中,但在这种情况下,重试上下文(剩余的重试次数等)也应传递到此 block 中。听起来太复杂了...

有没有人看到一种更简单的方法来为工作流 block 实现重试策略?

最佳答案

这里有两种方法CreateRetryTransformBlockCreateRetryActionBlock在这些假设下运行:

  • 调用者希望处理所有项目,即使其中一些项目反复失败。
  • 调用者有兴趣了解所有发生的异常,即使是最终成功的项目(不适用于 CreateRetryActionBlock )。
  • 调用者可能希望设置总重试次数的上限,在此之后 block 应转换为故障状态。
  • 调用者希望能够设置普通 block 的所有可用选项,包括 MaxDegreeOfParallelism , BoundedCapacity , CancellationTokenEnsureOrdered ,在与重试功能相关的选项之上。

  • 下面的实现使用 SemaphoreSlim 控制首次尝试的操作与延迟持续时间过去后重试的先前错误操作之间的并发级别。
    public class RetryExecutionDataflowBlockOptions : ExecutionDataflowBlockOptions
    {
        /// <summary>The limit after which an item is returned as failed.</summary>
        public int MaxAttemptsPerItem { get; set; } = 1;
        /// <summary>The delay duration before retrying an item.</summary>
        public TimeSpan RetryDelay { get; set; } = TimeSpan.Zero;
        /// <summary>The limit after which the block transitions to a faulted
        /// state (unlimited is the default).</summary>
        public int MaxRetriesTotal { get; set; } = -1;
    }
    
    public readonly struct RetryResult<TInput, TOutput>
    {
        public readonly TInput Input { get; }
        public readonly TOutput Output { get; }
        public readonly bool Success { get; }
        public readonly Exception[] Exceptions { get; }
    
        public bool Failed => !Success;
        public Exception FirstException => Exceptions != null ? Exceptions[0] : null;
        public int Attempts =>
            Exceptions != null ? Exceptions.Length + (Success ? 1 : 0) : 1;
    
        public RetryResult(TInput input, TOutput output, bool success,
            Exception[] exceptions)
        {
            Input = input;
            Output = output;
            Success = success;
            Exceptions = exceptions;
        }
    }
    
    public class RetryLimitException : Exception
    {
        public RetryLimitException(string message, Exception innerException)
            : base(message, innerException) { }
    }
    
    public static IPropagatorBlock<TInput, RetryResult<TInput, TOutput>>
        CreateRetryTransformBlock<TInput, TOutput>(
        Func<TInput, Task<TOutput>> transform,
        RetryExecutionDataflowBlockOptions dataflowBlockOptions)
    {
        if (transform == null) throw new ArgumentNullException(nameof(transform));
        if (dataflowBlockOptions == null)
            throw new ArgumentNullException(nameof(dataflowBlockOptions));
        int maxAttemptsPerItem = dataflowBlockOptions.MaxAttemptsPerItem;
        int maxRetriesTotal = dataflowBlockOptions.MaxRetriesTotal;
        TimeSpan retryDelay = dataflowBlockOptions.RetryDelay;
        if (maxAttemptsPerItem < 1) throw new ArgumentOutOfRangeException(
            nameof(dataflowBlockOptions.MaxAttemptsPerItem));
        if (maxRetriesTotal < -1) throw new ArgumentOutOfRangeException(
            nameof(dataflowBlockOptions.MaxRetriesTotal));
        if (retryDelay < TimeSpan.Zero) throw new ArgumentOutOfRangeException(
            nameof(dataflowBlockOptions.RetryDelay));
        var cancellationToken = dataflowBlockOptions.CancellationToken;
    
        var exceptionsCount = 0;
        var semaphore = new SemaphoreSlim(
            dataflowBlockOptions.MaxDegreeOfParallelism);
    
        async Task<(TOutput, Exception)> ProcessOnceAsync(TInput item)
        {
            await semaphore.WaitAsync(); // Preserve the SynchronizationContext
            try
            {
                var result = await transform(item).ConfigureAwait(false);
                return (result, null);
            }
            catch (Exception ex)
            {
                if (maxRetriesTotal != -1)
                {
                    if (Interlocked.Increment(ref exceptionsCount) > maxRetriesTotal)
                    {
                        throw new RetryLimitException($"The max retry limit " +
                            $"({maxRetriesTotal}) has been reached.", ex);
                    }
                }
                return (default, ex);
            }
            finally
            {
                semaphore.Release();
            }
        }
    
        async Task<Task<RetryResult<TInput, TOutput>>> ProcessWithRetryAsync(
            TInput item)
        {
            // Creates a two-stages operation. Preserves the context on every await.
            var (result, firstException) = await ProcessOnceAsync(item);
            if (firstException == null) return Task.FromResult(
                new RetryResult<TInput, TOutput>(item, result, true, null));
            return RetryStageAsync();
    
            async Task<RetryResult<TInput, TOutput>> RetryStageAsync()
            {
                var exceptions = new List<Exception>();
                exceptions.Add(firstException);
                for (int i = 2; i <= maxAttemptsPerItem; i++)
                {
                    await Task.Delay(retryDelay, cancellationToken);
                    var (result, exception) = await ProcessOnceAsync(item);
                    if (exception != null)
                        exceptions.Add(exception);
                    else
                        return new RetryResult<TInput, TOutput>(item, result,
                            true, exceptions.ToArray());
                }
                return new RetryResult<TInput, TOutput>(item, default, false,
                    exceptions.ToArray());
            };
        }
    
        // The input block awaits the first stage of each operation
        var input = new TransformBlock<TInput, Task<RetryResult<TInput, TOutput>>>(
            item => ProcessWithRetryAsync(item), dataflowBlockOptions);
    
        // The output block awaits the second (and final) stage of each operation
        var output = new TransformBlock<Task<RetryResult<TInput, TOutput>>,
            RetryResult<TInput, TOutput>>(t => t, dataflowBlockOptions);
    
        input.LinkTo(output, new DataflowLinkOptions { PropagateCompletion = true });
    
        // In case of failure ensure that the input block is faulted too,
        // so that its input/output queues are emptied, and any pending
        // SendAsync operations are aborted
        PropagateFailure(output, input);
    
        return DataflowBlock.Encapsulate(input, output);
    
        async void PropagateFailure(IDataflowBlock block1, IDataflowBlock block2)
        {
            try { await block1.Completion.ConfigureAwait(false); }
            catch (Exception ex) { block2.Fault(ex); }
        }
    }
    
    public static ITargetBlock<TInput> CreateRetryActionBlock<TInput>(
        Func<TInput, Task> action,
        RetryExecutionDataflowBlockOptions dataflowBlockOptions)
    {
        if (action == null) throw new ArgumentNullException(nameof(action));
        var block = CreateRetryTransformBlock<TInput, object>(async input =>
        {
            await action(input).ConfigureAwait(false); return null;
        }, dataflowBlockOptions);
        var nullTarget = DataflowBlock.NullTarget<RetryResult<TInput, object>>();
        block.LinkTo(nullTarget);
        return block;
    }
    

    关于c# - ITargetBlock<TInput> 中的重试策略,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17469689/

    相关文章:

    c# - C# : using -= operator by events? 是什么意思

    task-parallel-library - 您是否需要等待 TPL 数据流 DataflowBlock.NullTarget<T> 完成

    c# - TPL 数据流工作流

    c# 预编译大数组或其他解决方案以将大数据嵌入可执行文件

    c# - 函数数组与开关的性能

    c# - 并行与同步

    c# - 在不同时间需要返回值时将任务链接在一起的正确方法 #2

    c# - 导致 TPL 数据流管道死锁的异常

    c# - C# 中的 task.continuewith 与数据流

    c# - 使用 ClickOnce 发布时如何获取状态信息(例如,7/30 上传)?