c# - 使用 TPL 数据流的请求/响应模式

标签 c# .net task-parallel-library tpl-dataflow

我们在使用 TPL 数据流库时遇到问题,需要请求/响应模式。我们的问题是我们有一个调用依赖服务的 .NET 核心 API。依赖服务限制并发请求。我们的 API 不限制并发请求;因此,我们一次可以收到数千个请求。在这种情况下,依赖服务将在达到其限制后拒绝请求。因此,我们实现了一个 BufferBlock<T>和一个 TransformBlock<TIn, TOut> .表演很扎实,效果很好。我们用 1000 个用户测试了我们的 API 前端,每秒发出 100 个请求,没有 0 个问题。缓冲 block 缓冲请求,转换 block 并行执行我们所需数量的请求。依赖服务接收我们的请求并响应。我们在转换 block 操作中返回该响应,一切正常。我们的问题是缓冲 block 和转换 block 断开连接,这意味着请求/响应不同步。我们遇到了一个请求将收到另一个请求者的响应的问题(请参阅下面的代码)。

具体到下面的代码,我们的问题出在GetContent方法。该方法从我们 API 中的服务层调用,最终从我们的 Controller 调用。下面的代码和服务层都是单例。 SendAsync缓冲区与变换 block 断开连接 ReceiveAsync以便返回任意响应,不一定是发出的请求。

因此,我们的问题是:有没有一种方法可以使用数据流 block 来关联请求/响应?最终目标是请求进入我们的 API,发送到相关服务,然后返回给客户端。我们的数据流实现代码如下。

public class HttpClientWrapper : IHttpClientManager
{
    private readonly IConfiguration _configuration;
    private readonly ITokenService _tokenService;
    private HttpClient _client;

    private BufferBlock<string> _bufferBlock;
    private TransformBlock<string, JObject> _actionBlock;

    public HttpClientWrapper(IConfiguration configuration, ITokenService tokenService)
    {
        _configuration = configuration;
        _tokenService = tokenService;

        _bufferBlock = new BufferBlock<string>();

        var executionDataFlowBlockOptions = new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 10
        };

        var dataFlowLinkOptions = new DataflowLinkOptions
        {
            PropagateCompletion = true
        };

        _actionBlock = new TransformBlock<string, JObject>(t => ProcessRequest(t),
            executionDataFlowBlockOptions);

        _bufferBlock.LinkTo(_actionBlock, dataFlowLinkOptions);
    }

    public void Connect()
    {
        _client = new HttpClient();

        _client.DefaultRequestHeaders.Add("x-ms-client-application-name",
            "ourappname");
    }

    public async Task<JObject> GetContent(string request)
    {
        await _bufferBlock.SendAsync(request);

        var result = await _actionBlock.ReceiveAsync();

        return result;
    }

    private async Task<JObject> ProcessRequest(string request)
    {
        if (_client == null)
        {
            Connect();
        }

        try
        {
            var accessToken = await _tokenService.GetTokenAsync(_configuration);

            var httpRequestMessage = new HttpRequestMessage(HttpMethod.Post,
                new Uri($"https://{_configuration.Uri}"));

            // add the headers
            httpRequestMessage.Headers.Add("Authorization", $"Bearer {accessToken}");
            // add the request body
            httpRequestMessage.Content = new StringContent(request, Encoding.UTF8,
                "application/json");

            var postRequest = await _client.SendAsync(httpRequestMessage);

            var response = await postRequest.Content.ReadAsStringAsync();

            return JsonConvert.DeserializeObject<JObject>(response);
        }
        catch (Exception ex)
        {
            // log error

            return new JObject();
        }
    }
}

最佳答案

您需要做的是用一个 id 标记每个传入的项目,以便您可以将数据输入与结果输出相关联。以下是如何执行此操作的示例:

namespace ConcurrentFlows.DataflowJobs {
    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Threading.Tasks;
    using System.Threading.Tasks.Dataflow;

    /// <summary>
    /// A generic interface defining that:
    /// for a specified input type => an awaitable result is produced.
    /// </summary>
    /// <typeparam name="TInput">The type of data to process.</typeparam>
    /// <typeparam name="TOutput">The type of data the consumer expects back.</typeparam>
    public interface IJobManager<TInput, TOutput> {
        Task<TOutput> SubmitRequest(TInput data);
    }

    /// <summary>
    /// A TPL-Dataflow based job manager.
    /// </summary>
    /// <typeparam name="TInput">The type of data to process.</typeparam>
    /// <typeparam name="TOutput">The type of data the consumer expects back.</typeparam>
    public class DataflowJobManager<TInput, TOutput> : IJobManager<TInput, TOutput> {

        /// <summary>
        /// It is anticipated that jobHandler is an injected
        /// singleton instance of a Dataflow based 'calculator', though this implementation
        /// does not depend on it being a singleton.
        /// </summary>
        /// <param name="jobHandler">A singleton Dataflow block through which all jobs are processed.</param>
        public DataflowJobManager(IPropagatorBlock<KeyValuePair<Guid, TInput>, KeyValuePair<Guid, TOutput>> jobHandler) {
            if (jobHandler == null) { throw new ArgumentException("Argument cannot be null.", "jobHandler"); }

            this.JobHandler = JobHandler;
            if (!alreadyLinked) {
                JobHandler.LinkTo(ResultHandler, new DataflowLinkOptions() { PropagateCompletion = true });
                alreadyLinked = true;
            }
        }

        private static bool alreadyLinked = false;            

        /// <summary>
        /// Submits the request to the JobHandler and asynchronously awaits the result.
        /// </summary>
        /// <param name="data">The input data to be processd.</param>
        /// <returns></returns>
        public async Task<TOutput> SubmitRequest(TInput data) {
            var taggedData = TagInputData(data);
            var job = CreateJob(taggedData);
            Jobs.TryAdd(job.Key, job.Value);
            await JobHandler.SendAsync(taggedData);
            return await job.Value.Task;
        }

        private static ConcurrentDictionary<Guid, TaskCompletionSource<TOutput>> Jobs {
            get;
        } = new ConcurrentDictionary<Guid, TaskCompletionSource<TOutput>>();

        private static ExecutionDataflowBlockOptions Options {
            get;
        } = GetResultHandlerOptions();

        private static ITargetBlock<KeyValuePair<Guid, TOutput>> ResultHandler {
            get;
        } = CreateReplyHandler(Options);

        private IPropagatorBlock<KeyValuePair<Guid, TInput>, KeyValuePair<Guid, TOutput>> JobHandler {
            get;
        }

        private KeyValuePair<Guid, TInput> TagInputData(TInput data) {
            var id = Guid.NewGuid();
            return new KeyValuePair<Guid, TInput>(id, data);
        }

        private KeyValuePair<Guid, TaskCompletionSource<TOutput>> CreateJob(KeyValuePair<Guid, TInput> taggedData) {
            var id = taggedData.Key;
            var jobCompletionSource = new TaskCompletionSource<TOutput>();
            return new KeyValuePair<Guid, TaskCompletionSource<TOutput>>(id, jobCompletionSource);
        }

        private static ExecutionDataflowBlockOptions GetResultHandlerOptions() {
            return new ExecutionDataflowBlockOptions() {
                MaxDegreeOfParallelism = Environment.ProcessorCount,
                BoundedCapacity = 1000
            };
        }

        private static ITargetBlock<KeyValuePair<Guid, TOutput>> CreateReplyHandler(ExecutionDataflowBlockOptions options) {
            return new ActionBlock<KeyValuePair<Guid, TOutput>>((result) => {
                RecieveOutput(result);
            }, options);
        }

        private static void RecieveOutput(KeyValuePair<Guid, TOutput> result) {
            var jobId = result.Key;
            TaskCompletionSource<TOutput> jobCompletionSource;
            if (!Jobs.TryRemove(jobId, out jobCompletionSource)) {
                throw new InvalidOperationException($"The jobId: {jobId} was not found.");
            }
            var resultValue = result.Value;
            jobCompletionSource.SetResult(resultValue);            
        }
    }
}

另见 this answer供引用。

关于c# - 使用 TPL 数据流的请求/响应模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50120304/

相关文章:

c# - 如何查看邮件是否发送成功?

python - 从 IronPython 使用 NumPy 和 SciPy 的 final方法

c# - 具有多种类型的列表

MySQL 查询(求和)在 VB 中不起作用

c# - 如何仅在 ContinueWith 中出现未处理的异常时中断?

C# implicit cast "overloading"和反射问题

C# string.split() 按大写分隔字符串

c# - 修改 Progress<T> 回调 Action

c# - 不直接返回任务时最恰本地使用 Async/Await?

c# - 如何通过WebBrowser控件C#避免下载json文件