c# - HttpClient 请求限制器和缓冲区的实现

标签 c# asp.net .net multithreading dotnet-httpclient

在我们的项目中,我们有一些服务使用 key 向第 3 方 API 发出请求。

这个 API 在所有端点之间有一个共享的速率限制(意味着对一个端点的请求需要 2 秒的冷却时间,然后我们才能使用不同的端点)。

我们使用定时后台作业处理了这个问题,任何时候只向其中一个端点发出请求。

经过一些架构的重新设计,我们已经到了一个我们不太依赖定时后台作业的地方,现在所有的 HttpRequests 都不能被调节,因为多个服务实例正在向 API 发出请求。

因此,在我们当前的示例中:

我们为所有需要的 API 端点设置了几个 HttpClient,即:

   services.AddHttpClient<Endpoint1Service>(client =>
   {
       client.BaseAddress = new Uri(configOptions.Services.Endpoint1.Url);
   });

   services.AddHttpClient<Endpoint2Service>(client =>
   {
       client.BaseAddress = new Uri(configOptions.Services.Endpoint2.Url);
   });

Endpoint1Service 和 Endpoint2Service 在被后台作业服务访问之前:

    public async Task DoJob()
    {
        var items = await _repository.GetItems();

        foreach (var item in items)
        {
            var processedResult = await _endpoint1Service.DoRequest(item);

            await Task.Delay(2000);

            //...
        }

        // save all results
    }

但是现在这些“端点”服务是并发访问的,每次都会创建一个新实例,因此无法降低请求率。

一种可能的解决方案是创建某种单例请求缓冲区,将其注入(inject)到使用此 API 的所有服务中,并调节这些请求以给定的速率发出。我看到的问题是将请求存储在内存缓冲区中似乎很危险,以防出现问题。

这是我应该关注的方向,还是我可以尝试的其他方向?

最佳答案

希望对您有所帮助: 我为类似的场景创建了以下内容。它的目标是并发节流多线程。然而,它也为您提供了一个方便的请求处理管道包装器。此外,它还提供了每个客户端的最大并发请求数限制(如果你想使用它的话)。

为每个端点服务创建一个实例,如果您希望限制为 1,则将其线程数设置为 1。如果您希望对给定端点的 4 个并发请求,请将其设置为 4。

https://github.com/tcwicks/ChillX/blob/master/src/ChillX.Threading/APIProcessor/AsyncThreadedWorkItemProcessor.cs

https://github.com/tcwicks/ChillX/blob/master/src/ChillX.Threading/APIProcessor/ThreadedWorkItemProcessor.cs

这两种实现是可以互换的。如果在 Web 服务器上下文中使用,前者可能更好,因为如果使用前台线程,它会卸载到后台线程池。

示例用法 在您的情况下,如果您希望在 1 个并发请求时限制它的速率,则可能将 _maxWorkerThreads 设置为 1 的值。如果您想将其速率限制为 4 个并发请求,请将其设置为 4。

//Example Usage for WebAPI controller
class Example
{
    private static ThreadedWorkItemProcessor<DummyRequest, DummyResponse, int, WorkItemPriority> ThreadedProcessorExample = new ThreadedWorkItemProcessor<DummyRequest, DummyResponse, int, WorkItemPriority>(
            _maxWorkItemLimitPerClient: 100 // Maximum number of concurrent requests in the processing queue per client. Set to int.MaxValue to disable concurrent request caps
            , _maxWorkerThreads: 16 // Maximum number of threads to scale upto
            , _threadStartupPerWorkItems: 4 // Consider starting a new processing thread ever X requests
            , _threadStartupMinQueueSize: 4 // Do NOT start a new processing thread if work item queue is below this size
            , _idleWorkerThreadExitSeconds: 10 // Idle threads will exit after X seconds
            , _abandonedResponseExpirySeconds: 60 // Expire processed work items after X seconds (Maybe the client terminated or the web request thread died)
            , _processRequestMethod: ProcessRequestMethod // Your Do Work method for processing the request
            , _logErrorMethod: Handler_LogError
            , _logMessageMethod: Handler_LogMessage
            );

    public async Task<DummyResponse> GetResponse([FromBody] DummyRequest _request)
    {
        int clientID = 1; //Replace with the client ID from your authentication mechanism if using per client request caps. Otherwise just hardcode to maybe 0 or whatever
        WorkItemPriority _priority;
        _priority = WorkItemPriority.Medium; //Assign the priority based on whatever prioritization rules.
        int RequestID = ThreadedProcessorExample.ScheduleWorkItem(_priority, _request, clientID);
        if (RequestID < 0)
        {
            //Client has exceeded maximum number of concurrent requests or Application Pool is shutting down
            //return a suitable error message here
            return new DummyResponse() { ErrorMessage = @"Maximum number of concurrent requests exceeded or service is restarting. Please retry request later." };
        }

        //If you need the result (Like in a webapi controller) then do this
        //Otherwise if it is say a backend processing sink where there is no client waiting for a response then we are done here. just return.

        KeyValuePair<bool, ThreadWorkItem<DummyRequest, DummyResponse, int>> workItemResult;

        workItemResult = await ThreadedProcessorExample.TryGetProcessedWorkItemAsync(RequestID,
            _timeoutMS: 1000, //Timeout of 1 second
            _taskWaitType: ThreadProcessorAsyncTaskWaitType.Delay_Specific,
            _delayMS: 10);
        if (!workItemResult.Key)
        {
            //Processing timeout or Application Pool is shutting down
            //return a suitable error message here
            return new DummyResponse() { ErrorMessage = @"Internal system timeout or service is restarting. Please retry request later." };
        }
        return workItemResult.Value.Response;
    }

    public static DummyResponse ProcessRequestMethod(DummyRequest request)
    {
        // Process the request and return the response
        return new DummyResponse() { orderID = request.orderID };
    }
    public static void Handler_LogError(Exception ex)
    {
        //Log unhandled exception here
    }

    public static void Handler_LogMessage(string Message)
    {
        //Log message here
    }
}

关于c# - HttpClient 请求限制器和缓冲区的实现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71872023/

相关文章:

.net - Protobuf-net - 序列化 .NET GUID - 如何在 C++ 中读取它?

c# - Visual Studio 2017 无法加载项目

ASP.NET 和 HTML5 视频播放时间

c# - 为什么 ASP.NET MVC 2 和 3 在工具箱中有 gridview?

c# - SQL Server Express 版本问题

c# - 如何从 DropDownList 中获取最后一个选项值?

c# - 在某些字符前后修剪字符串

c# - UserControl 的事件处理程序未触发

c# - 为什么抽象类实现接口(interface)?

c# - 我无法将数据从 c# 应用程序保存到我的 sql 数据库,没有错误