c# - 使用 HttpClient 和 Polly 发送并行请求,但每个主机只有一个,以优雅地处理 429 响应

标签 c# .net-core web-crawler tpl-dataflow polly

简介:

我正在构建一个单节点网络爬虫来简单地验证 URL 是 200 OK在 .NET Core 控制台应用程序中。我在不同的主机上有一组 URL,我正在向这些主机发送请求 HttpClient .我对使用 Polly 和 TPL Dataflow 还很陌生。

要求:

  • 我想支持与一个并行发送多个 HTTP 请求
    可配置 MaxDegreeOfParallelism .
  • 我想将任何给定主机的并行请求数限制为 1(或可配置)。这是为了优雅地处理每个主机 429 TooManyRequests使用 Polly 策略进行响应。或者,我可以使用断路器在收到一个 429 时取消对同一主机的并发请求。响应然后一次一个地处理该特定主机?
  • 我完全没有使用 TPL 数据流,而是使用 Polly Bulkhead 或其他一些机制来限制并行请求,但我不确定为了实现需求 #2 的配置会是什么样子。

  • 当前实现:

    我当前的实现有效,只是我经常看到我会有 x对同一主机的并行请求返回 429大约在同一时间......然后,他们都暂停重试策略......然后,他们都在同一时间再次猛击同一台主机,经常仍然收到429 s。即使我在整个队列中均匀分布同一主机的多个实例,我的 URL 集合也会因一些仍然开始生成 429 的特定主机而超重。 s 最终。

    收到后429 ,我想我只想向该主机发送一个并发请求,以尊重远程主机并追求200 s。

    验证器方法:
    public async Task<int> GetValidCount(IEnumerable<Uri> urls, CancellationToken cancellationToken)
    {
        var validator = new TransformBlock<Uri, bool>(
            async u => (await _httpClient.GetAsync(u, HttpCompletionOption.ResponseHeadersRead, cancellationToken)).IsSuccessStatusCode,
            new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = MaxDegreeOfParallelism}
        );
        foreach (var url in urls)
            await validator.SendAsync(url, cancellationToken);
        validator.Complete();
        var validUrlCount = 0;
        while (await validator.OutputAvailableAsync(cancellationToken))
        {
            if(await validator.ReceiveAsync(cancellationToken))
                validUrlCount++;
        }
        await validator.Completion;
        return validUrlCount;
    }
    

    应用于 GetValidCount() 中使用的 HttpClient 实例的 Polly 策略以上。
    IAsyncPolicy<HttpResponseMessage> waitAndRetryTooManyRequests = Policy
        .HandleResult<HttpResponseMessage>(r => r.StatusCode == HttpStatusCode.TooManyRequests)
        .WaitAndRetryAsync(3,
            (retryCount, response, context) =>
                response.Result?.Headers.RetryAfter.Delta ?? TimeSpan.FromMilliseconds(120),
            async (response, timespan, retryCount, context) =>
            {
                // log stuff
            });
    

    问题:

    我如何修改或替换此解决方案以增加对需求 #2 的满意度?

    最佳答案

    我会尝试引入某种标志 LimitedMode检测此特定客户端是否以受限模式进入。下面我声明了两个策略 - 一个简单的重试策略只是为了捕获 TooManyRequests 并设置标志。第二个策略是开箱即用的 BulkHead政策。

        public void ConfigureServices(IServiceCollection services)
        {
            /* other configuration */
    
            var registry = services.AddPolicyRegistry();
    
            var catchPolicy = Policy.HandleResult<HttpResponseMessage>(r =>
                {
                    LimitedMode = r.StatusCode == HttpStatusCode.TooManyRequests;
                    return false;
                })
                .WaitAndRetryAsync(1, i => TimeSpan.FromSeconds(3)); 
    
            var bulkHead = Policy.BulkheadAsync<HttpResponseMessage>(1, 10, OnBulkheadRejectedAsync);
    
            registry.Add("catchPolicy", catchPolicy);
            registry.Add("bulkHead", bulkHead);
    
            services.AddHttpClient<CrapyWeatherApiClient>((client) =>
            {
                client.BaseAddress = new Uri("hosturl");
            }).AddPolicyHandlerFromRegistry(PolicySelector);
        }
    

    然后,您可能希望使用 PolicySelector 动态决定应用哪个策略。机制:如果限制模式处于事件状态 - 使用 catch 429 策略包装散头策略。如果收到成功状态代码 - 切换回没有隔板的常规模式。
        private IAsyncPolicy<HttpResponseMessage> PolicySelector(IReadOnlyPolicyRegistry<string> registry, HttpRequestMessage request)
        {
            var catchPolicy = registry.Get<IAsyncPolicy<HttpResponseMessage>>("catchPolicy");
            var bulkHead = registry.Get<IAsyncPolicy<HttpResponseMessage>>("bulkHead");
            if (LimitedMode)
            {
                return catchPolicy.WrapAsync(bulkHead);
            }
    
            return catchPolicy;
        }        
    

    关于c# - 使用 HttpClient 和 Polly 发送并行请求,但每个主机只有一个,以优雅地处理 429 响应,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57022754/

    相关文章:

    Docker - RabbitMQ.Client.Exceptions.BrokerUnreachableException : 'None of the specified endpoints were reachable'

    java - 多线程网络爬虫的最快架构

    python - 如何在python icrawler中使用搜索关键字重命名爬虫文件

    java - Java 8 java.util.function.Consumer<> 的 c# 等价物是什么?

    c# - 将 View 渲染为字符串缓存问题

    c# - AWS SQS,如何计算消息属性的 MD5 消息摘要

    java - 为爬虫爬取的URL生成唯一的Hash

    c# - 规范化真的会影响高流量站点的性能吗?

    c# - NPOI HSSF 与 SS 命名空间

    c# - Visual Studio 调试器 - 是我的问题还是这个调试器本身充满了错误?