c# - Parallel.For SendAsync 到 BufferBlock 到异步转换?

标签 c# async-await task-parallel-library tpl-dataflow parallel.for

我正在学习 TPL Dataflow 库。到目前为止,这正是我所寻找的。

我创建了一个简单的类(如下),它执行以下功能

  • 执行 ImportPropertiesForBranch 后,我会转到第 3 方 API 并获取属性列表
  • 返回 xml 列表并将其反序列化为属性数据数组(id、api 端点、lastupdated)。大约有 400 多处特性(如房屋)。
  • 然后,我使用 Parallel.For 将属性数据 SendAsync 发送到我的 propertyBufferBlock
  • propertyBufferBlock 链接到 propertyXmlBlock(它本身是一个 TransformBlock)。
  • 然后,propertyXmlBlock(异步)返回到 API(使用属性数据中提供的 API 端点)并获取属性 xml 进行反序列化。
  • 一旦 xml 返回并可用,我们就可以反序列化
  • 稍后,我将添加更多 TransformBlock 将其保存到数据存储中。

所以我的问题是;

  • 代码中是否存在任何潜在的瓶颈或可能造成麻烦的区域?我知道我没有包含任何错误处理或取消(即将推出)。
  • 是否可以在 TransformBlockawait 异步调用,或者这是一个 瓶颈?
  • 虽然代码有效,但我担心 Parallel.ForBufferBlockTransformBlock 中的异步的缓冲和异步性。我不确定这是最好的方法,我可能混淆了一些概念。

欢迎任何指导、改进和陷阱建议。

using System.Diagnostics;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using My.Interfaces;
using My.XmlService.Models;

namespace My.ImportService
{
    public class ImportService
    {

        private readonly IApiService _apiService;
        private readonly IXmlService _xmlService;
        private readonly IRepositoryService _repositoryService;

        public ImportService(IApiService apiService,
            IXmlService xmlService,
            IRepositoryService repositoryService)
        {
            _apiService = apiService;
            _xmlService = xmlService;
            _repositoryService = repositoryService;

            ConstructPipeline();
        }

        private BufferBlock<propertiesProperty> propertyBufferBlock;
        private TransformBlock<propertiesProperty, string> propertyXmlBlock;
        private TransformBlock<string, propertyType> propertyDeserializeBlock;
        private ActionBlock<propertyType> propertyCompleteBlock;

        public async Task<bool> ImportPropertiesForBranch(string branchName, int branchUrlId)
        {
            var propertyListXml = await _apiService.GetPropertyListAsync(branchUrlId);

            if (string.IsNullOrEmpty(propertyListXml))
                return false;

            var properties = _xmlService.DeserializePropertyList(propertyListXml);

            if (properties?.property == null || properties.property.Length == 0)
                return false;

            // limited to the first 20 for testing
            Parallel.For(0, 20,
                new ParallelOptions {MaxDegreeOfParallelism = 3},
                i => propertyBufferBlock.SendAsync(properties.property[i]));

            propertyBufferBlock.Complete();

            await propertyCompleteBlock.Completion;

            return true;
        }

        private void ConstructPipeline()
        {
            propertyBufferBlock = GetPropertyBuffer();
            propertyXmlBlock = GetPropertyXmlBlock();
            propertyDeserializeBlock = GetPropertyDeserializeBlock();
            propertyCompleteBlock = GetPropertyCompleteBlock();

            propertyBufferBlock.LinkTo(
                propertyXmlBlock,
                new DataflowLinkOptions {PropagateCompletion = true});

            propertyXmlBlock.LinkTo(
                propertyDeserializeBlock,
                new DataflowLinkOptions {PropagateCompletion = true});

            propertyDeserializeBlock.LinkTo(
                propertyCompleteBlock,
                new DataflowLinkOptions {PropagateCompletion = true});
        }

        private BufferBlock<propertiesProperty> GetPropertyBuffer()
        {
            return new BufferBlock<propertiesProperty>();
        }

        private TransformBlock<propertiesProperty, string> GetPropertyXmlBlock()
        {
            return new TransformBlock<propertiesProperty, string>(async propertiesProperty =>
                {
                    Debug.WriteLine($"getting xml {propertiesProperty.prop_id}");
                    var propertyXml = await _apiService.GetXmlAsStringAsync(propertiesProperty.url);
                    return propertyXml;
                },
                new ExecutionDataflowBlockOptions
                {
                    MaxDegreeOfParallelism = 1,
                    BoundedCapacity = 2
                });
        }

        private TransformBlock<string, propertyType> GetPropertyDeserializeBlock()
        {
            return new TransformBlock<string, propertyType>(xmlAsString =>
                {
                    Debug.WriteLine($"deserializing");
                    var propertyType = _xmlService.DeserializeProperty(xmlAsString);
                    return propertyType;
                },
                new ExecutionDataflowBlockOptions
                {
                    MaxDegreeOfParallelism = 1,
                    BoundedCapacity = 2
                });
        }

        private ActionBlock<propertyType> GetPropertyCompleteBlock()
        {
            return new ActionBlock<propertyType>(propertyType =>
                {
                    Debug.WriteLine($"complete {propertyType.id}");
                    Debug.WriteLine(propertyType.address.display);
                },
                new ExecutionDataflowBlockOptions
                {
                    MaxDegreeOfParallelism = 1,
                    BoundedCapacity = 2
                });
        }
    }
}

最佳答案

你实际上以错误的方式做了一些事情:

i => propertyBufferBlock.SendAsync(properties.property[i])

您需要 await 方法,否则您将创建太多并发任务。

还有这一行:

MaxDegreeOfParallelism = 1

将限制 block 的执行以进行后续执行,这可能会降低您的性能。

正如您在评论中所说,您切换到同步方法Post,并通过设置BoundedCapacity来限制 block 的容量。应谨慎使用此变体,因为您需要检查它的返回值,该值表明消息是否已被接受。

至于您担心等待 block 内的 async 方法 - 这绝对没问题,并且应该像在 async 方法使用的其他情况下一样完成。

关于c# - Parallel.For SendAsync 到 BufferBlock 到异步转换?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44086034/

相关文章:

c# - 这是 Partitioner.Create(int fromInclusive, int toExclusive) 中的错误吗?

c# - ConstructorInfo.GetParameters 是线程安全的吗?

c# - 为什么我的 GTK# 编译二进制文件(使用 MonoDevelop 创建)不能在 Windows 上运行?

c# - 如何从 Windows 8 中的 KnowFolders 检索文件?

node.js - 如何比较 : GET Data from 2 APIs,,POST bool

c# - 使用 async/await 锁定资源

c# - 在任务并行库场景中按正确顺序记录

c# - 使用 EPPlus 如何生成电子表格,其中数字是数字而不是文本

c# - MediatR 错误 : Register your handlers with the container

c# - 以列表的形式访问所有可用的 ViewComponents