我正在使用 TPL Dataflow 从票务系统下载数据。
系统将票号作为输入,调用 API 并接收包含各种信息的嵌套 JSON 响应。一旦接收到,一组 block 就会处理嵌套结构的每个级别并将其写入关系数据库。例如对话、对话附件、用户、用户照片、用户标签等
Json
{
"conversations":[
{
"id":12345,
"author_id":23456,
"body":"First Conversation"
},
{
"id":98765,
"authorid":34567,
"body":"Second Conversation",
"attachments":[
{
"attachmentid":12345
"attachment_name":"Test.jpg"
}
}
],
"users":[
{
"userid":12345
"user_name":"John Smith"
},
{
"userid":34567
"user_name":"Joe Bloggs"
"user_photo":
{
"photoid":44556,
"photo_name":"headshot.jpg"
},
tags:[
"development",
"vip"
]
}
]
代码
某些 block 需要广播,以便更深的嵌套仍然可以访问数据。例如UserModelJson 是广播的,因此 1 个 block 可以处理写入用户,1 个 block 可以处理写入用户标签,1 个 block 可以处理写入用户照片。
var loadTicketsBlock = new TransformBlock<int, ConversationsModelJson>(async ticketNumber => await p.GetConversationObjectFromTicket(ticketNumber));
var broadcastConversationsObjectBlock = new BroadcastBlock<ConversationsModelJson>(conversations => conversations);
//Conversation
var getConversationsFromConversationObjectBlock = new TransformManyBlock<ConversationsModelJson, ConversationModelJson>(conversation => ModelConverter.ConvertConversationsObjectJsonToConversationJson(conversation));
var convertConversationsBlock = new TransformBlock<ConversationModelJson, ConversationModel>(conversation => ModelConverter.ConvertConversationJsonToConversation(conversation));
var batchConversionBlock = new BatchBlock<ConversationModel>(batchBlockSize);
var convertConversationsToDTBlock = new TransformBlock<IEnumerable<ConversationModel>, DataTable>(conversations => ModelConverter.ConvertConversationModelToConversationDT(conversations));
var writeConversationsBlock = new ActionBlock<DataTable>(async conversations => await p.ProcessConversationsAsync(conversations));
var getUsersFromConversationsBlock = new TransformManyBlock<ConversationsModelJson, UserModelJson>(conversations => ModelConverter.ConvertConversationsJsonToUsersJson(conversations));
var broadcastUserBlock = new BroadcastBlock<UserModelJson>(userModelJson => userModelJson);
//User
var convertUsersBlock = new TransformBlock<UserModelJson, UserModel>(user => ModelConverter.ConvertUserJsonToUser(user));
var batchUsersBlock = new BatchBlock<UserModel>(batchBlockSize);
var convertUsersToDTBlock = new TransformBlock<IEnumerable<UserModel>, DataTable>(users => ModelConverter.ConvertUserModelToUserDT(users));
var writeUsersBlock = new ActionBlock<DataTable>(async users => await p.ProcessUsersAsync(users));
//UserTag
var getUserTagsFromUserBlock = new TransformBlock<UserModelJson, UserTagModel>(user => ModelConverter.ConvertUserJsonToUserTag(user));
var batchTagsBlock = new BatchBlock<UserTagModel>(batchBlockSize);
var convertTagsToDTBlock = new TransformBlock<IEnumerable<UserTagModel>, DataTable>(tags => ModelConverter.ConvertUserTagModelToUserTagDT(tags));
var writeTagsBlock = new ActionBlock<DataTable>(async tags => await p.ProcessUserTagsAsync(tags));
DataflowLinkOptions linkOptions = new DataflowLinkOptions()
{
PropagateCompletion = true
};
loadTicketsBlock.LinkTo(broadcastConversationsObjectBlock, linkOptions);
//Conversation
broadcastConversationsObjectBlock.LinkTo(getConversationsFromConversationObjectBlock, linkOptions);
getConversationsFromConversationObjectBlock.LinkTo(convertConversationsBlock, linkOptions);
convertConversationsBlock.LinkTo(batchConversionBlock, linkOptions);
batchConversionBlock.LinkTo(convertConversationsToDTBlock, linkOptions);
convertConversationsToDTBlock.LinkTo(writeConversationsBlock, linkOptions);
var tickets = await provider.GetAllTicketsAsync();
foreach (var ticket in tickets)
{
cts.Token.ThrowIfCancellationRequested();
await loadTicketsBlock.SendAsync(ticket.TicketID);
}
loadTicketsBlock.Complete();
对于要写入的每种数据类型,都会重复 LinkTo
block 。
我知道整个管道何时完成
await Task.WhenAll(<Last block of each branch>.Completion);
但是如果我将票号 1 传递到 loadTicketsBlock
block 中,那么我如何知道该特定票证何时已通过管道中的所有 block 并因此完成?
我想知道这一点的原因是我可以向 UI 报告 100 票中的第 1 票已完成。
最佳答案
您可以考虑使用TaskCompletionSource
作为所有子实体的基类。例如:
class Attachment : TaskCompletionSource
{
}
class Conversation : TaskCompletionSource
{
}
然后每次在数据库中插入子实体时,都会将其标记为已完成:
attachment.SetResult();
...或者如果插入失败,则将其标记为错误:
attachment.SetException(ex);
最后,您可以使用 Task.WhenAll
方法将所有异步完成合并为一个。 :
Task ticketCompletion = Task.WhenAll(Enumerable.Empty<Task>()
.Append(ticket.Task)
.Concat(attachments.Select(e => e.Task))
.Concat(conversations.Select(e => e.Task)));
关于c# - 当第一个数据流 block 的输入完成所有链接 block 时报告,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70667482/