c# - TPL 数据流 - 并行和异步处理,同时保持顺序

标签 c# asynchronous task-parallel-library tpl-dataflow

我创建了一个 TPL 数据流管道,它由 3 个 TransformBlock 和最后一个 ActionBlock 组成。

var loadXml = new TransformBlock<Job, Job>(job => { ... }); // I/O
var validateData = new TransformBlock<Job, Job>(job => { ... }); // Parsing&Validating&Calculations
var importJob = new TransformBlock<Job, Job>(job => { ... }); // Saving to database

var loadingFailed = new ActionBlock<Job>(job => CreateResponse(job));
var validationFailed = new ActionBlock<Job>(job => CreateResponse(job));
var reportImport = new ActionBlock<Job>(job => CreateResponse(job));

loadXml.LinkTo(validateData, job => job.ReturnCode == 100);
loadXml.LinkTo(loadingFailed);

validateData.LinkTo(importJob, Job => Job.ReturnCode == 100);
validateData.LinkTo(validationFailed);

importJob.LinkTo(reportImport);

每个 block 都会用处理后的数据填充作业对象,因为我不仅需要数据本身,还需要一般信息,我需要这些信息来响应消息。如果一切顺利,我基本上会添加一个 XML 路径并获得一个包含信息的响应对象。

如果有两个或更多文件需要一些时间从 HDD 读取,我该如何实现,它会并行和异步读取两个文件,同时保持它们进来的顺序?如果文件 1 需要更多时间,文件 2 需要等待文件 1 完成,然后我将数据传递到下一个 block ,然后它也将开始并行和异步验证数据,但这里也保持下一个 block 的顺序?

现在看起来即使我将 SendAsync 调用到 headblock,它也会按顺序处理所有文件。

编辑: 所以我为了管道目的编写了一个小测试类。它有3个阶段。我想要实现的是第一个在文件进入时继续读取文件的 TransformBlock(来自 FileSystemWatcher 的 SendAsync)并在完成后按文件进入的顺序输出它。意味着如果 File1 是一个大文件并且 File2+3 进来,两者将被读入,而 File1 仍在处理中,但 File2+3 必须等到它可以发送到第二个 TransformBlock,因为 File1 仍在被读入。Stage2 应该工作相同。另一方面,Stage3 需要获取从 File1 生成的对象并保存到数据库中,这可以并行和异步完成。但是,file1 中的对象需要在 file2 和 file3 之前处理。因此,整个文件内容需要按顺序处理,以便它们进入。我尝试通过使用 MaxDegreeOfParallelism 限制第三个 TransformBlock 来做到这一点。和 BoundedCapacity都设置为 1,但这似乎失败了,并没有真正保持 Console.WriteLine 中的顺序

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Xml;
using System.Linq;

namespace OrderProcessing
{
    public class Job
    {
        public string Path { get; set; }

        public XmlDocument Document { get; set; }

        public List<Object> BusinessObjects { get; set; }

        public int ReturnCode { get; set; }

        public int ID { get; set; }
    }

    public class Test
    {
        ITargetBlock<Job> pathBlock = null;

        CancellationTokenSource cancellationTokenSource;

        Random rnd = new Random();

        private bool ReadDocument(Job job)
        {
            Console.WriteLine($"ReadDocument {DateTime.Now.TimeOfDay} | Thread {Thread.CurrentThread.ManagedThreadId} is processing Job Id: {job.ID}");
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            // Read the document
            job.Document = new XmlDocument();

            // Some checking
            return true;
        }

        private bool ValidateXml(Job job)
        {
            Console.WriteLine($"ValidateXml {DateTime.Now.TimeOfDay} | Thread {Thread.CurrentThread.ManagedThreadId} is processing Job Id: {job.ID}");
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            // Check XML against XSD and perform remaining checks
            job.BusinessObjects = new List<object>();

            // Just for tests
            job.BusinessObjects.Add(new object());
            job.BusinessObjects.Add(new object());

            // Parse Xml and create business objects
            return true;
        }

        private bool ProcessJob(Job job)
        {
            Console.WriteLine($"ProcessJob {DateTime.Now.TimeOfDay} | Thread {Thread.CurrentThread.ManagedThreadId} is processing Job Id: {job.ID}");

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            Parallel.ForEach(job.BusinessObjects, bO =>
            {
                ImportObject(bO);
            });


            // Import the job
            return true;
        }

        private object ImportObject(object o)
        {
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            return new object();
        }

        private void CreateResponse(Job job)
        {
            if(job.ReturnCode == 100)
            {
                Console.WriteLine("ID {0} was successfully imported.", job.ID);

            }
            else
            {
                Console.WriteLine("ID {0} failed to import.", job.ID);
            }

            // Create response XML with returncodes
        }

        ITargetBlock<Job> CreateJobProcessingPipeline()
        {
            var loadXml = new TransformBlock<Job, Job>(job =>
            {
                try
                {
                    if(ReadDocument(job))
                    {
                        // For later error handling
                        job.ReturnCode = 100; // success
                    }
                    else
                    {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch(OperationCanceledException)
                {
                    job.ReturnCode = 300;
                    return job;
                }
            }, TransformBlockOptions());

            var validateXml = new TransformBlock<Job, Job>(job =>
            {
                try
                {
                    if(ValidateXml(job))
                    {
                        // For later error handling
                        job.ReturnCode = 100;
                    }
                    else
                    {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch(OperationCanceledException)
                {
                    job.ReturnCode = 300;
                    return job;
                }
            }, TransformBlockOptions());


            var importJob = new TransformBlock<Job, Job>(job =>
            {
                try
                {
                    if(ProcessJob(job))
                    {
                        // For later error handling
                        job.ReturnCode = 100; // success
                    }
                    else
                    {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch(OperationCanceledException)
                {
                    job.ReturnCode = 300;
                    return job;
                }
            }, ActionBlockOptions());

            var loadingFailed = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());
            var validationFailed = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());
            var reportImport = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());

            //
            // Connect the pipeline
            //
            loadXml.LinkTo(validateXml, job => job.ReturnCode == 100);
            loadXml.LinkTo(loadingFailed);

            validateXml.LinkTo(importJob, Job => Job.ReturnCode == 100);
            validateXml.LinkTo(validationFailed);

            importJob.LinkTo(reportImport);

            // Return the head of the network.
            return loadXml;
        }

        public void Start()
        {
            cancellationTokenSource = new CancellationTokenSource();

            pathBlock = CreateJobProcessingPipeline();
        }

        public async void AddJob(string path, int id)
        {
            Job j = new Job();
            j.Path = path;
            j.ID = id;

            await pathBlock.SendAsync(j);
        }

        static ExecutionDataflowBlockOptions TransformBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 8,
                BoundedCapacity = 32
            };
        }

        private static ExecutionDataflowBlockOptions ActionBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 1,
                BoundedCapacity = 1
            };
        }

        public void Cancel()
        {
            if(cancellationTokenSource != null)
                cancellationTokenSource.Cancel();
        }
    }

    class Program
    {
        private static String InputXml = @"C:\XML\Part.xml";
        private static Test _Pipeline;

        static void Main(string[] args)
        {
            _Pipeline = new Test();
            _Pipeline.Start();


            var data = Enumerable.Range(1, 100);

            foreach(var d in data)
                _Pipeline.AddJob(InputXml, d);

            //Wait before closing the application so we can see the results.
            Console.ReadLine();
        }
    }
}

EDIT2:在我通过设置 BoundedCapacity 进行更改后对于 Unbounded,我按照发送到管道中的顺序得到了所有内容。所以它之前并没有真正出问题,但我猜消息被丢弃了?

如果我确定 EnsureOrdered和使用 MaxDegreeOfParallelism 一样正确最后 8 个 TransformBlock ,如果您检查下面的输出部分,则项目不再有序。但这是它需要按顺序排列的地方,因为我将数据保存到数据库中,这需要按照它进来的顺序。如果它在离开最后一个时不按顺序排列并不重要 TransformBlock ,所以我想我不能在这里保持并行性?

ValidateXml 08:27:24.2855461 | Thread 21 is processing Job Id: 36
ValidateXml 08:27:24.2855461 | Thread 28 is processing Job Id: 37
+++ ProcessJob 08:27:24.2880490 | Thread 33 is processing Job Id: 9
ReadDocument 08:27:24.2855461 | Thread 6 is processing Job Id: 56
ValidateXml 08:27:25.2853094 | Thread 19 is processing Job Id: 38
ReadDocument 08:27:25.2853094 | Thread 13 is processing Job Id: 58
+++ ProcessJob 08:27:25.2868091 | Thread 34 is processing Job Id: 13
ReadDocument 08:27:25.2858087 | Thread 16 is processing Job Id: 59
+++ ProcessJob 08:27:25.2858087 | Thread 25 is processing Job Id: 10
+++ ProcessJob 08:27:25.2858087 | Thread 29 is processing Job Id: 12
ReadDocument 08:27:25.2853094 | Thread 11 is processing Job Id: 57
ReadDocument 08:27:25.2873097 | Thread 15 is processing Job Id: 60
ValidateXml 08:27:25.2853094 | Thread 22 is processing Job Id: 40
ValidateXml 08:27:25.2853094 | Thread 23 is processing Job Id: 39
+++ ProcessJob 08:27:25.2858087 | Thread 30 is processing Job Id: 11
ValidateXml 08:27:26.2865381 | Thread 21 is processing Job Id: 41
ReadDocument 08:27:26.2865381 | Thread 14 is processing Job Id: 61
ValidateXml 08:27:26.2865381 | Thread 20 is processing Job Id: 42
ValidateXml 08:27:26.2865381 | Thread 26 is processing Job Id: 43
ReadDocument 08:27:26.2865381 | Thread 17 is processing Job Id: 62
ReadDocument 08:27:26.2870374 | Thread 12 is processing Job Id: 63
+++ ProcessJob 08:27:26.2870374 | Thread 24 is processing Job Id: 14

编辑 3: 使用@JSteward 最新代码后的输出。

ReadDocument 09:01:03.9363340 JobId: 1
ReadDocument 09:01:03.9368357 JobId: 5
ReadDocument 09:01:03.9373347 JobId: 6
ReadDocument 09:01:03.9368357 JobId: 8
ReadDocument 09:01:03.9363340 JobId: 4
ReadDocument 09:01:03.9373347 JobId: 3
ReadDocument 09:01:03.9373347 JobId: 7
ReadDocument 09:01:03.9368357 JobId: 2
ReadDocument 09:01:05.2037570 JobId: 9
ReadDocument 09:01:05.3108413 JobId: 10
ReadDocument 09:01:05.5678177 JobId: 11
ReadDocument 09:01:05.6308763 JobId: 12
ValidateXml 09:01:05.6338782 JobId: 1
ValidateXml 09:01:06.3754174 JobId: 2
ReadDocument 09:01:06.3764184 JobId: 13
ReadDocument 09:01:06.3764184 JobId: 14
ReadDocument 09:01:07.3756634 JobId: 15
ReadDocument 09:01:07.3756634 JobId: 18
ValidateXml 09:01:07.3756634 JobId: 3
ValidateXml 09:01:07.3756634 JobId: 4
ReadDocument 09:01:07.3756634 JobId: 17
ReadDocument 09:01:07.3756634 JobId: 16
ReadDocument 09:01:08.3753887 JobId: 19
ReadDocument 09:01:08.3753887 JobId: 20
ValidateXml 09:01:08.3753887 JobId: 5
ProcessJob 09:01:08.3763906 JobId: 1
ReadDocument 09:01:09.3744411 JobId: 21
ReadDocument 09:01:09.3749410 JobId: 24
ProcessJob 09:01:09.3749410 JobId: 2
ReadDocument 09:01:09.3749410 JobId: 22
ReadDocument 09:01:09.3749410 JobId: 23
ReadDocument 09:01:10.3752061 JobId: 25
ReadDocument 09:01:10.3752061 JobId: 27
ValidateXml 09:01:10.3752061 JobId: 6
ValidateXml 09:01:10.3752061 JobId: 7
ValidateXml 09:01:10.3752061 JobId: 8
ReadDocument 09:01:10.3752061 JobId: 26
ReadDocument 09:01:11.3759294 JobId: 29
ReadDocument 09:01:11.3759294 JobId: 28
ValidateXml 09:01:11.3764278 JobId: 10
ReadDocument 09:01:11.3759294 JobId: 31
ValidateXml 09:01:11.3759294 JobId: 9
ReadDocument 09:01:11.3759294 JobId: 30
ValidateXml 09:01:12.3751553 JobId: 11
ReadDocument 09:01:12.3751553 JobId: 33
ValidateXml 09:01:12.3751553 JobId: 12
ReadDocument 09:01:12.3751553 JobId: 34
ReadDocument 09:01:12.3751553 JobId: 32
ValidateXml 09:01:13.3753842 JobId: 13
ValidateXml 09:01:13.3753842 JobId: 14
ValidateXml 09:01:13.3753842 JobId: 16
ReadDocument 09:01:13.3753842 JobId: 35
ReadDocument 09:01:13.3753842 JobId: 36
ValidateXml 09:01:13.3753842 JobId: 15
ReadDocument 09:01:14.3756414 JobId: 37
ValidateXml 09:01:14.3756414 JobId: 19
ValidateXml 09:01:14.3756414 JobId: 18
ValidateXml 09:01:14.3756414 JobId: 17
ReadDocument 09:01:14.3756414 JobId: 40
ReadDocument 09:01:14.3756414 JobId: 38
ReadDocument 09:01:14.3756414 JobId: 39
ProcessJob 09:01:14.3761419 JobId: 3
SendToDataBase 09:01:14.3806453 JobId: 1
SendToDataBase 09:01:14.3821472 JobId: 2
ProcessJob 09:01:14.3821472 JobId: 4
ValidateXml 09:01:15.3763758 JobId: 20
ReadDocument 09:01:15.3763758 JobId: 42
ValidateXml 09:01:15.3763758 JobId: 21
ReadDocument 09:01:15.3773772 JobId: 43
ReadDocument 09:01:15.3763758 JobId: 41
ValidateXml 09:01:15.3768800 JobId: 22
ReadDocument 09:01:15.3773772 JobId: 44
ValidateXml 09:01:16.3761117 JobId: 23
ValidateXml 09:01:16.3761117 JobId: 26
ValidateXml 09:01:16.3761117 JobId: 24
ValidateXml 09:01:16.3761117 JobId: 25
ReadDocument 09:01:16.3761117 JobId: 45
ReadDocument 09:01:16.3761117 JobId: 46
ProcessJob 09:01:16.3761117 JobId: 5
ReadDocument 09:01:17.3758334 JobId: 47
ValidateXml 09:01:17.3763315 JobId: 28
ValidateXml 09:01:17.3763315 JobId: 27
ReadDocument 09:01:17.3763315 JobId: 49
ReadDocument 09:01:17.3763315 JobId: 48
ProcessJob 09:01:17.3763315 JobId: 6
ValidateXml 09:01:17.3763315 JobId: 29
ReadDocument 09:01:17.3763315 JobId: 50
ReadDocument 09:01:18.3755786 JobId: 51
ReadDocument 09:01:18.3755786 JobId: 52
<<<
ProcessJob 09:01:18.3770792 JobId: 10
ProcessJob 09:01:18.3770792 JobId: 9
ProcessJob 09:01:18.3755786 JobId: 7
>>>
ReadDocument 09:01:18.3755786 JobId: 53
ValidateXml 09:01:18.3755786 JobId: 32
ValidateXml 09:01:18.3755786 JobId: 31
ValidateXml 09:01:18.3755786 JobId: 30
ReadDocument 09:01:18.3760794 JobId: 54
ProcessJob 09:01:18.3755786 JobId: 8
ValidateXml 09:01:19.3753274 JobId: 34
ValidateXml 09:01:19.3753274 JobId: 33
ReadDocument 09:01:19.3758261 JobId: 56
ReadDocument 09:01:19.3758261 JobId: 55
ValidateXml 09:01:19.3758261 JobId: 35
ValidateXml 09:01:20.3752782 JobId: 36
ValidateXml 09:01:20.3752782 JobId: 37
ProcessJob 09:01:20.3757709 JobId: 11
ReadDocument 09:01:20.3752782 JobId: 57
ValidateXml 09:01:20.3752782 JobId: 38
ReadDocument 09:01:20.3757709 JobId: 58
ReadDocument 09:01:20.3757709 JobId: 59
ProcessJob 09:01:21.3757202 JobId: 12
ValidateXml 09:01:21.3757202 JobId: 39
ReadDocument 09:01:21.3757202 JobId: 62
ReadDocument 09:01:21.3757202 JobId: 61
ReadDocument 09:01:21.3757202 JobId: 60
ReadDocument 09:01:22.3764154 JobId: 63
ReadDocument 09:01:22.3764154 JobId: 64
ReadDocument 09:01:22.3764154 JobId: 65
ProcessJob 09:01:22.3794167 JobId: 16
ValidateXml 09:01:22.3764154 JobId: 40
ValidateXml 09:01:22.3764154 JobId: 42
ReadDocument 09:01:22.3764154 JobId: 66
ValidateXml 09:01:22.3774149 JobId: 43
ProcessJob 09:01:22.3764154 JobId: 13
ValidateXml 09:01:22.3764154 JobId: 41
ProcessJob 09:01:22.3779160 JobId: 15
SendToDataBase 09:01:22.3784159 JobId: 3
ProcessJob 09:01:22.3764154 JobId: 14
ValidateXml 09:01:22.3859209 JobId: 44
SendToDataBase 09:01:22.4309993 JobId: 4
SendToDataBase 09:01:22.4460051 JobId: 5
SendToDataBase 09:01:22.4465047 JobId: 6
ReadDocument 09:01:23.3760112 JobId: 67
ValidateXml 09:01:23.3760112 JobId: 46
ValidateXml 09:01:23.3760112 JobId: 47
ReadDocument 09:01:23.3760112 JobId: 68
ValidateXml 09:01:23.3760112 JobId: 45
ProcessJob 09:01:23.3760112 JobId: 17
ValidateXml 09:01:24.3762581 JobId: 48
ReadDocument 09:01:24.3762581 JobId: 69
ProcessJob 09:01:24.3762581 JobId: 18
ProcessJob 09:01:24.3762581 JobId: 19
ReadDocument 09:01:24.3762581 JobId: 70
CreateResponse 09:01:24.3777606 JobId: 58
CreateResponse 09:01:24.3994684 JobId: 59
CreateResponse 09:01:24.4059908 JobId: 60
CreateResponse 09:01:24.4114777 JobId: 61
CreateResponse 09:01:24.4134789 JobId: 62
ValidateXml 09:01:25.3759607 JobId: 49
ValidateXml 09:01:25.3759607 JobId: 51
ProcessJob 09:01:25.3784627 JobId: 22
ValidateXml 09:01:25.3759607 JobId: 52
ProcessJob 09:01:25.3759607 JobId: 20
ValidateXml 09:01:25.3774629 JobId: 53
ValidateXml 09:01:25.3759607 JobId: 50
ValidateXml 09:01:25.3774629 JobId: 54
ReadDocument 09:01:25.3759607 JobId: 72
ReadDocument 09:01:25.3774629 JobId: 73
ReadDocument 09:01:25.3759607 JobId: 71
ReadDocument 09:01:25.3779625 JobId: 74
ProcessJob 09:01:25.3759607 JobId: 21
SendToDataBase 09:01:25.3774629 JobId: 7
CreateResponse 09:01:25.3759607 JobId: 39
SendToDataBase 09:01:25.4398495 JobId: 8
SendToDataBase 09:01:25.4448555 JobId: 9
SendToDataBase 09:01:25.4478565 JobId: 10
SendToDataBase 09:01:25.4483570 JobId: 11
CreateResponse 09:01:25.4448555 JobId: 42
CreateResponse 09:01:25.4608868 JobId: 43
SendToDataBase 09:01:25.4553682 JobId: 12
CreateResponse 09:01:25.4613665 JobId: 44
CreateResponse 09:01:25.4698849 JobId: 45
ReadDocument 09:01:26.3754874 JobId: 75
ReadDocument 09:01:26.3754874 JobId: 76
ReadDocument 09:01:26.3754874 JobId: 78
ValidateXml 09:01:26.3754874 JobId: 55
ProcessJob 09:01:26.3759876 JobId: 24
ProcessJob 09:01:26.3754874 JobId: 23
ReadDocument 09:01:26.3754874 JobId: 77
SendToDataBase 09:01:26.3759876 JobId: 13
SendToDataBase 09:01:26.3980055 JobId: 14
SendToDataBase 09:01:26.3985045 JobId: 15
SendToDataBase 09:01:26.4020099 JobId: 16
ReadDocument 09:01:27.3762164 JobId: 79
ValidateXml 09:01:27.3762164 JobId: 56
ProcessJob 09:01:27.3762164 JobId: 26
ReadDocument 09:01:27.3762164 JobId: 82
ProcessJob 09:01:27.3762164 JobId: 25
ReadDocument 09:01:27.3762164 JobId: 81
ReadDocument 09:01:27.3762164 JobId: 80
ValidateXml 09:01:27.3762164 JobId: 63
ValidateXml 09:01:27.3777165 JobId: 64
ProcessJob 09:01:27.3767157 JobId: 27
ValidateXml 09:01:27.3762164 JobId: 57
SendToDataBase 09:01:27.3777165 JobId: 17
SendToDataBase 09:01:27.4327571 JobId: 18
SendToDataBase 09:01:27.4357587 JobId: 19
ReadDocument 09:01:28.3761410 JobId: 83
ProcessJob 09:01:28.3761410 JobId: 28
ProcessJob 09:01:28.3761410 JobId: 29
ValidateXml 09:01:28.3761410 JobId: 66
SendToDataBase 09:01:28.3761410 JobId: 20
ProcessJob 09:01:28.3761410 JobId: 30
ValidateXml 09:01:28.3761410 JobId: 67
ValidateXml 09:01:28.3761410 JobId: 65
SendToDataBase 09:01:28.3861483 JobId: 21
SendToDataBase 09:01:28.4141687 JobId: 22
ReadDocument 09:01:28.6079764 JobId: 84
ReadDocument 09:01:28.6552491 JobId: 85
ReadDocument 09:01:28.7047606 JobId: 86
ValidateXml 09:01:28.7327861 JobId: 68
ProcessJob 09:01:28.7327861 JobId: 31
ReadDocument 09:01:29.1285484 JobId: 87
ProcessJob 09:01:29.1894672 JobId: 32
SendToDataBase 09:01:29.1894672 JobId: 23
SendToDataBase 09:01:29.1944706 JobId: 24
ReadDocument 09:01:29.3910070 JobId: 88
ValidateXml 09:01:29.5569691 JobId: 69
ReadDocument 09:01:29.5995036 JobId: 89
ValidateXml 09:01:29.6085095 JobId: 70
ReadDocument 09:01:29.6581266 JobId: 90
ValidateXml 09:01:29.8797899 JobId: 71
ValidateXml 09:01:30.1244519 JobId: 72
ValidateXml 09:01:30.1584763 JobId: 73
ReadDocument 09:01:30.2100312 JobId: 91
ProcessJob 09:01:30.2490536 JobId: 33
ProcessJob 09:01:30.2950865 JobId: 34
ReadDocument 09:01:30.3290995 JobId: 92
ProcessJob 09:01:30.3636350 JobId: 35
SendToDataBase 09:01:30.3636350 JobId: 25
SendToDataBase 09:01:30.3701300 JobId: 26
SendToDataBase 09:01:30.3706299 JobId: 27
ProcessJob 09:01:30.4987430 JobId: 36
ReadDocument 09:01:30.5642707 JobId: 93
ReadDocument 09:01:30.6088035 JobId: 94
ValidateXml 09:01:30.7213868 JobId: 74
ReadDocument 09:01:30.7544106 JobId: 95
ProcessJob 09:01:30.7544106 JobId: 37
SendToDataBase 09:01:30.7544106 JobId: 28
ProcessJob 09:01:31.1091681 JobId: 38
SendToDataBase 09:01:31.1091681 JobId: 29
SendToDataBase 09:01:31.1151730 JobId: 30
ValidateXml 09:01:31.2012468 JobId: 75
ValidateXml 09:01:31.2827940 JobId: 76
ValidateXml 09:01:31.3143168 JobId: 77
ValidateXml 09:01:31.4073842 JobId: 78
ReadDocument 09:01:31.4369059 JobId: 96
ReadDocument 09:01:31.4699302 JobId: 97
ProcessJob 09:01:31.7201123 JobId: 40
SendToDataBase 09:01:31.7201123 JobId: 31
ProcessJob 09:01:32.1569310 JobId: 41
SendToDataBase 09:01:32.1569310 JobId: 32
ValidateXml 09:01:32.3650822 JobId: 79
ValidateXml 09:01:32.3650822 JobId: 80
ProcessJob 09:01:32.3966047 JobId: 46
ReadDocument 09:01:32.4236247 JobId: 98
ReadDocument 09:01:32.4831869 JobId: 99
ValidateXml 09:01:32.5607342 JobId: 81
ReadDocument 09:01:32.5777363 JobId: 100
ProcessJob 09:01:33.1461630 JobId: 47
ProcessJob 09:01:33.2081967 JobId: 48
SendToDataBase 09:01:33.2081967 JobId: 33
SendToDataBase 09:01:33.2137015 JobId: 34
SendToDataBase 09:01:33.2172021 JobId: 35
ValidateXml 09:01:33.2347146 JobId: 82
ValidateXml 09:01:33.4228519 JobId: 83
ProcessJob 09:01:33.4228519 JobId: 49
ValidateXml 09:01:33.4373638 JobId: 84
ProcessJob 09:01:33.4878995 JobId: 50
SendToDataBase 09:01:33.4878995 JobId: 36
ProcessJob 09:01:33.5819674 JobId: 51
ValidateXml 09:01:33.6239992 JobId: 85
ProcessJob 09:01:33.6239992 JobId: 52
SendToDataBase 09:01:33.6239992 JobId: 37
SendToDataBase 09:01:33.6295082 JobId: 38
ValidateXml 09:01:33.6870563 JobId: 86
ValidateXml 09:01:33.7125626 JobId: 87
ProcessJob 09:01:34.1238635 JobId: 53
ProcessJob 09:01:34.5796949 JobId: 54
<<<
SendToDataBase 09:01:34.5796949 JobId: 40
SendToDataBase 09:01:34.5856995 JobId: 41
SendToDataBase 09:01:34.5887008 JobId: 46
>>>
ValidateXml 09:01:34.7951688 JobId: 88
ValidateXml 09:01:34.9162007 JobId: 89
ProcessJob 09:01:34.9541705 JobId: 55
ValidateXml 09:01:35.0464443 JobId: 90
ProcessJob 09:01:35.3634898 JobId: 56
ProcessJob 09:01:35.3795024 JobId: 57
ValidateXml 09:01:35.5165095 JobId: 91
ValidateXml 09:01:35.8614345 JobId: 92
ProcessJob 09:01:35.9985415 JobId: 63
ValidateXml 09:01:36.0481807 JobId: 93
ProcessJob 09:01:36.0763064 JobId: 64
ProcessJob 09:01:36.0993229 JobId: 65
SendToDataBase 09:01:36.0993229 JobId: 47
SendToDataBase 09:01:36.1048270 JobId: 48
ValidateXml 09:01:36.1572079 JobId: 94
ValidateXml 09:01:36.3791015 JobId: 95
ProcessJob 09:01:36.4212607 JobId: 66
SendToDataBase 09:01:36.4212607 JobId: 49
SendToDataBase 09:01:36.4267655 JobId: 50
SendToDataBase 09:01:36.4272654 JobId: 51
SendToDataBase 09:01:36.4322913 JobId: 52
SendToDataBase 09:01:36.4327837 JobId: 53
ProcessJob 09:01:36.5149796 JobId: 67
SendToDataBase 09:01:36.5149796 JobId: 54
ValidateXml 09:01:36.6861048 JobId: 96
ValidateXml 09:01:36.7845716 JobId: 97
ValidateXml 09:01:37.0175979 JobId: 98
ValidateXml 09:01:37.3788835 JobId: 99
ValidateXml 09:01:37.6477046 JobId: 100
ProcessJob 09:01:37.8269808 JobId: 68
SendToDataBase 09:01:37.8269808 JobId: 55
ProcessJob 09:01:37.8940108 JobId: 69
ProcessJob 09:01:38.2955556 JobId: 70
ProcessJob 09:01:38.3110583 JobId: 71
SendToDataBase 09:01:38.3110583 JobId: 56
SendToDataBase 09:01:38.3125586 JobId: 57
CreateResponse 09:01:38.4551538 JobId: 95
CreateResponse 09:01:38.4925304 JobId: 96
ProcessJob 09:01:38.5382532 JobId: 72
ProcessJob 09:01:38.9129894 JobId: 73
SendToDataBase 09:01:38.9129894 JobId: 63
SendToDataBase 09:01:38.9185062 JobId: 64
SendToDataBase 09:01:38.9189949 JobId: 65
ProcessJob 09:01:38.9852121 JobId: 74
ProcessJob 09:01:39.0317458 JobId: 75
SendToDataBase 09:01:39.0317458 JobId: 66
SendToDataBase 09:01:39.0377511 JobId: 67
ProcessJob 09:01:39.6129381 JobId: 76
SendToDataBase 09:01:39.6129381 JobId: 68
ProcessJob 09:01:39.7833004 JobId: 77
SendToDataBase 09:01:39.7833004 JobId: 69
ProcessJob 09:01:39.8740443 JobId: 78
ProcessJob 09:01:40.3145731 JobId: 79
SendToDataBase 09:01:40.3145731 JobId: 70
SendToDataBase 09:01:40.3205708 JobId: 71
ProcessJob 09:01:40.4912084 JobId: 80
ProcessJob 09:01:40.5307205 JobId: 81
SendToDataBase 09:01:40.5317212 JobId: 72
ProcessJob 09:01:40.5652454 JobId: 82
ProcessJob 09:01:41.2902736 JobId: 83
ProcessJob 09:01:41.2902736 JobId: 84
ProcessJob 09:01:41.3598244 JobId: 85
SendToDataBase 09:01:41.3598244 JobId: 73
SendToDataBase 09:01:41.3663284 JobId: 74
SendToDataBase 09:01:41.3713317 JobId: 75
SendToDataBase 09:01:41.3718392 JobId: 76
SendToDataBase 09:01:41.3723328 JobId: 77
ProcessJob 09:01:42.2677493 JobId: 86
SendToDataBase 09:01:42.2677493 JobId: 78
ProcessJob 09:01:42.6466081 JobId: 87
ProcessJob 09:01:42.8947969 JobId: 88
SendToDataBase 09:01:42.8947969 JobId: 79
ProcessJob 09:01:43.0012509 JobId: 89
ProcessJob 09:01:43.1513589 JobId: 90
ProcessJob 09:01:43.4545800 JobId: 91
SendToDataBase 09:01:43.4545800 JobId: 80
SendToDataBase 09:01:43.4600832 JobId: 81
SendToDataBase 09:01:43.4605919 JobId: 82
ProcessJob 09:01:43.5946813 JobId: 92
ProcessJob 09:01:44.1731027 JobId: 93
SendToDataBase 09:01:44.1731027 JobId: 83
SendToDataBase 09:01:44.1786068 JobId: 84
SendToDataBase 09:01:44.1816090 JobId: 85
ProcessJob 09:01:44.4678171 JobId: 94
SendToDataBase 09:01:44.4678171 JobId: 86
ProcessJob 09:01:45.3426043 JobId: 97
SendToDataBase 09:01:45.3426043 JobId: 87
ProcessJob 09:01:45.3751270 JobId: 98
ProcessJob 09:01:45.7363757 JobId: 99
ProcessJob 09:01:45.7809216 JobId: 100
SendToDataBase 09:01:45.7809216 JobId: 88
SendToDataBase 09:01:45.7879270 JobId: 89
SendToDataBase 09:01:45.7925566 JobId: 90
SendToDataBase 09:01:45.8776726 JobId: 91
SendToDataBase 09:01:45.8776726 JobId: 92
SendToDataBase 09:01:46.5813640 JobId: 93
SendToDataBase 09:01:46.5813640 JobId: 94
SendToDataBase 09:01:47.7407165 JobId: 97
SendToDataBase 09:01:47.7407165 JobId: 98
SendToDataBase 09:01:48.4382058 JobId: 99
SendToDataBase 09:01:48.7357557 JobId: 100

最佳答案

如果您将 TransformBlock 链接到 ActionBlock,则可以执行此操作。

这是最容易用可编译的控制台应用程序来演示的。

此应用程序处理一系列整数,但您可以将这些整数替换为自定义工作单元类。

(我从我编写的一个实用程序修改了这段代码,它使用相对较慢的 LZMA 压缩算法进行多线程文件压缩。这个实用程序必须从文件中顺序读取输入数据,然后将它以 block 的形式传递给队列使用多个线程以任意顺序处理数据,最后将压缩 block 输出到一个队列,该队列必须保留数据 block 的原始顺序。)

示例代码:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace Demo
{
    class Program
    {
        public static void Main()
        {
            var data = Enumerable.Range(1, 100);
            var task = Process(data);

            Console.WriteLine("Waiting for task to complete");
            task.Wait();
            Console.WriteLine("Task complete.");
        }

        public static async Task Process(IEnumerable<int> data)
        {
            var queue = new TransformBlock<int, int>(block => process(block), transformBlockOptions());
            var writer = new ActionBlock<int>(block => write(block), actionBlockOptions());

            queue.LinkTo(writer, new DataflowLinkOptions { PropagateCompletion = true });

            await enqueDataToProcessAndAwaitCompletion(data, queue);

            await writer.Completion;
        }

        static int process(int block)
        {
            Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} is processing block {block}");
            emulateWorkload();
            return -block;
        }

        static void write(int block)
        {
            Console.WriteLine("Output: " + block);
        }

        static async Task enqueDataToProcessAndAwaitCompletion(IEnumerable<int> data, TransformBlock<int, int> queue)
        {
            await enqueueDataToProcess(data, queue);
            queue.Complete();
        }

        static async Task enqueueDataToProcess(IEnumerable<int> data, ITargetBlock<int> queue)
        {
            foreach (var item in data)
                await queue.SendAsync(item);
        }


        static ExecutionDataflowBlockOptions transformBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 8,
                BoundedCapacity = 32
            };
        }

        private static ExecutionDataflowBlockOptions actionBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 1,
                BoundedCapacity = 1
            };
        }

        static Random rng = new Random();
        static object locker = new object();

        static void emulateWorkload()
        {
            int delay;

            lock (locker)
            {
                delay = rng.Next(250, 750);
            }

            Thread.Sleep(delay);
        }
    }
}

输出:

Waiting for task to complete
Thread 8 is processing block 8
Thread 5 is processing block 2
Thread 6 is processing block 6
Thread 4 is processing block 5
Thread 7 is processing block 7
Thread 10 is processing block 4
Thread 9 is processing block 1
Thread 3 is processing block 3
Thread 3 is processing block 9
Thread 8 is processing block 10
Thread 5 is processing block 11
Thread 6 is processing block 12
Thread 9 is processing block 13
Thread 10 is processing block 14
Thread 7 is processing block 15
Thread 8 is processing block 16
Thread 4 is processing block 17
Thread 5 is processing block 18
Thread 3 is processing block 19
Thread 9 is processing block 20
Thread 8 is processing block 21
Output: -1
Output: -2
Output: -3
Output: -4
Output: -5
Output: -6
Output: -7
Output: -8
Output: -9
Output: -10
Output: -11
Output: -12
Output: -13
Thread 6 is processing block 22
Thread 10 is processing block 23
Output: -14
Thread 7 is processing block 24
Output: -15
Output: -16
Thread 6 is processing block 25
Output: -17
Thread 4 is processing block 26
Thread 5 is processing block 27
----------------->SNIP<-----------------
Thread 10 is processing block 93
Thread 8 is processing block 94
Output: -83
Thread 4 is processing block 95
Output: -84
Output: -85
Output: -86
Output: -87
Thread 3 is processing block 96
Output: -88
Thread 6 is processing block 97
Thread 5 is processing block 98
Thread 10 is processing block 99
Thread 9 is processing block 100
Output: -89
Output: -90
Output: -91
Output: -92
Output: -93
Output: -94
Output: -95
Output: -96
Output: -97
Output: -98
Output: -99
Output: -100
Task complete.
Press any key to continue . . .

注意“ block ”如何被多个线程以任意顺序处理,但输出顺序与输入顺序相同。

根据 actionBlockOptions() 方法设置输出 Action block 选项非常重要,MaxDegreeOfParallelismBoundedCapacity 都设置为1.

这就是导致输出以正确顺序序列化的原因。如果将输出的 BoundedCapacityMaxDegreeOfParallelism 设置为大于 1,则可能会以错误的顺序输出。

关于c# - TPL 数据流 - 并行和异步处理,同时保持顺序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41633764/

相关文章:

c# - 集成测试 Http 代理服务器

c# - 当一个任务引发另一个任务与第一个任务同步时,Task.WaitAll挂起

c# - await task.delay 有助于更快地刷新 UI,但是如何呢?

c# - 重新填充组合框

c# - Service Fabric 包含其他文件

multithreading - 异步非阻塞循环与多线程的真实例子?

c++ - 在 boost asio 中为异步操作的函数处理程序分配线程的困难

c# - 为什么异步会减慢独立的后续代码的速度?

c# - 取消请求到所有任务的传播时间 (TPL)

c# - 在 Windows 10 上检测 .NET (VB/C#) 中的进程最小化状态