c# - 当第一个数据流 block 的输入完成所有链接 block 时报告

标签 c# tpl-dataflow

我正在使用 TPL Dataflow 从票务系统下载数据。

系统将票号作为输入,调用 API 并接收包含各种信息的嵌套 JSON 响应。一旦接收到,一组 block 就会处理嵌套结构的每个级别并将其写入关系数据库。例如对话、对话附件、用户、用户照片、用户标签等

Json

{
    "conversations":[
        {
            "id":12345,
            "author_id":23456,
            "body":"First Conversation"
        },
        {
            "id":98765,
            "authorid":34567,
            "body":"Second Conversation",
            "attachments":[
            {
                "attachmentid":12345
                "attachment_name":"Test.jpg"
            }
        }
    ],
    "users":[
        {
            "userid":12345
            "user_name":"John Smith"
        },
        {
            "userid":34567
            "user_name":"Joe Bloggs"
            "user_photo":
            {
                "photoid":44556,
                "photo_name":"headshot.jpg"
            },
            tags:[
                "development",
                "vip"
            ]
        }
    ]

代码

某些 block 需要广播,以便更深的嵌套仍然可以访问数据。例如UserModelJson 是广播的,因此 1 个 block 可以处理写入用户,1 个 block 可以处理写入用户标签,1 个 block 可以处理写入用户照片。

var loadTicketsBlock = new TransformBlock<int, ConversationsModelJson>(async ticketNumber => await p.GetConversationObjectFromTicket(ticketNumber));
var broadcastConversationsObjectBlock = new BroadcastBlock<ConversationsModelJson>(conversations => conversations);

//Conversation
var getConversationsFromConversationObjectBlock = new TransformManyBlock<ConversationsModelJson, ConversationModelJson>(conversation => ModelConverter.ConvertConversationsObjectJsonToConversationJson(conversation));
var convertConversationsBlock = new TransformBlock<ConversationModelJson, ConversationModel>(conversation => ModelConverter.ConvertConversationJsonToConversation(conversation));
var batchConversionBlock = new BatchBlock<ConversationModel>(batchBlockSize);
var convertConversationsToDTBlock = new TransformBlock<IEnumerable<ConversationModel>, DataTable>(conversations => ModelConverter.ConvertConversationModelToConversationDT(conversations));
var writeConversationsBlock = new ActionBlock<DataTable>(async conversations => await p.ProcessConversationsAsync(conversations));

var getUsersFromConversationsBlock = new TransformManyBlock<ConversationsModelJson, UserModelJson>(conversations => ModelConverter.ConvertConversationsJsonToUsersJson(conversations));
var broadcastUserBlock = new BroadcastBlock<UserModelJson>(userModelJson => userModelJson);

//User
var convertUsersBlock = new TransformBlock<UserModelJson, UserModel>(user => ModelConverter.ConvertUserJsonToUser(user));
var batchUsersBlock = new BatchBlock<UserModel>(batchBlockSize);
var convertUsersToDTBlock = new TransformBlock<IEnumerable<UserModel>, DataTable>(users => ModelConverter.ConvertUserModelToUserDT(users));
var writeUsersBlock = new ActionBlock<DataTable>(async users => await p.ProcessUsersAsync(users));

//UserTag
var getUserTagsFromUserBlock = new TransformBlock<UserModelJson, UserTagModel>(user => ModelConverter.ConvertUserJsonToUserTag(user));
var batchTagsBlock = new BatchBlock<UserTagModel>(batchBlockSize);
var convertTagsToDTBlock = new TransformBlock<IEnumerable<UserTagModel>, DataTable>(tags => ModelConverter.ConvertUserTagModelToUserTagDT(tags));
var writeTagsBlock = new ActionBlock<DataTable>(async tags => await p.ProcessUserTagsAsync(tags));


DataflowLinkOptions linkOptions = new DataflowLinkOptions()
{
    PropagateCompletion = true
};

loadTicketsBlock.LinkTo(broadcastConversationsObjectBlock, linkOptions);

//Conversation
broadcastConversationsObjectBlock.LinkTo(getConversationsFromConversationObjectBlock, linkOptions);
getConversationsFromConversationObjectBlock.LinkTo(convertConversationsBlock, linkOptions);
convertConversationsBlock.LinkTo(batchConversionBlock, linkOptions);
batchConversionBlock.LinkTo(convertConversationsToDTBlock, linkOptions);
convertConversationsToDTBlock.LinkTo(writeConversationsBlock, linkOptions);         

var tickets = await provider.GetAllTicketsAsync();

foreach (var ticket in tickets)
{
    cts.Token.ThrowIfCancellationRequested();
    await loadTicketsBlock.SendAsync(ticket.TicketID);
}

loadTicketsBlock.Complete();

对于要写入的每种数据类型,都会重复 LinkTo block 。

我知道整个管道何时完成

await Task.WhenAll(<Last block of each branch>.Completion);

但是如果我将票号 1 传递到 loadTicketsBlock block 中,那么我如何知道该特定票证何时已通过管道中的所有 block 并因此完成?

我想知道这一点的原因是我可以向 UI 报告 100 票中的第 1 票已完成。

最佳答案

您可以考虑使用TaskCompletionSource作为所有子实体的基类。例如:

class Attachment : TaskCompletionSource
{
}

class Conversation : TaskCompletionSource
{
}

然后每次在数据库中插入子实体时,都会将其标记为已完成:

attachment.SetResult();

...或者如果插入失败,则将其标记为错误:

attachment.SetException(ex);

最后,您可以使用 Task.WhenAll 方法将所有异步完成合并为一个。 :

Task ticketCompletion = Task.WhenAll(Enumerable.Empty<Task>()
    .Append(ticket.Task)
    .Concat(attachments.Select(e => e.Task))
    .Concat(conversations.Select(e => e.Task)));

关于c# - 当第一个数据流 block 的输入完成所有链接 block 时报告,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70667482/

相关文章:

c# - VS Designer 将 "empty"字符串替换为指向空字符串的资源标识符

c# - 具有有序输入和输出的并行管道

c# - TPL 数据流中 BroadcastBlock 的重复异常

c# - 使用 TPL Dataflow 封装以操作 block 结尾的管道

c# - 如何从创建它的代码中获取对 WCF 服务对象的引用?

c# - 如何在局域网中的所有主机上查找正在运行的程序的所有实例?

c# - Nservicebus 不在 msmq 中存储订阅者

c# - 导出到 .csv 文件

c# - 我应该选择简单的 Dictionary 还是 ConcurrentDictionary 来处理任务并行库

c# - ITargetBlock<TInput> 中的重试策略