c# - 如何正确管理 TPL 数据流中的完成

标签 c# task-parallel-library tpl-dataflow

我创建了类似于网络爬虫的东西来创建我需要管理的 1000 多个网络服务的报告。因此,我创建了一个 TPL 数据流管道来管理数据的获取和处理。 我想象中的 Pipeline 看起来有点像这样(对不起我的绘画技巧 :D): The Pipeline

我已经创建了一个实现并且一切正常,直到我开始整个流水线。我将 500 个对象放入管道作为管道的输入,并预计程序会运行一段时间,但程序在移动到执行 block 后停止执行。 在检查了 Programm 的流程后,在我看来 Completion 传播到 Dispose Block 的速度很快。 我使用相同的管道创建了一个小示例项目,以检查它是我对输入类的实现还是管道本身。示例代码是这样的:

public class Job
{
    public int Ticker { get; set; }

    public Type Type { get; }

    public Job(Type type)
    {
        Type = type;
    }

    public Task Prepare()
    {
        Console.WriteLine("Preparing");
        Ticker = 0;
        return Task.CompletedTask;
    }

    public Task Tick()
    {
        Console.WriteLine("Ticking");
        Ticker++;
        return Task.CompletedTask;
    }

    public bool IsCommitable()
    {
        Console.WriteLine("Trying to commit");
        return IsFinished() || ( Ticker != 0 && Ticker % 100000 == 0);
    }

    public bool IsFinished()
    {
        Console.WriteLine("Trying to finish");
        return Ticker == 1000000;
    }

    public void IntermediateCleanUp()
    {
        Console.WriteLine("intermediate Cleanup");
        Ticker = Ticker - 120;
    }

    public void finalCleanUp()
    {
        Console.WriteLine("Final Cleanup");
        Ticker = -1;
    }
}

这是我输入到准备 block 中的输入类。

public class Dataflow
{
    private TransformBlock<Job, Job> _preparationsBlock;

    private BufferBlock<Job> _balancerBlock;

    private readonly ExecutionDataflowBlockOptions _options = new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 4
    };

    private readonly DataflowLinkOptions _linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

    private TransformBlock<Job, Job> _typeATickBlock;

    private TransformBlock<Job, Job> _typeBTickBlock;

    private TransformBlock<Job, Job> _writeBlock;

    private TransformBlock<Job, Job> _intermediateCleanupBlock;

    private ActionBlock<Job> _finalCleanupBlock;

    public async Task Process()
    {
        CreateBlocks();

        ConfigureBlocks();

        for (int i = 0; i < 500; i++)
        {
            await _preparationsBlock.SendAsync(new Job(i % 2 == 0 ? Type.A : Type.B));
        }
        _preparationsBlock.Complete();

        await Task.WhenAll(_preparationsBlock.Completion, _finalCleanupBlock.Completion);
    }

    private void CreateBlocks()
    {
        _preparationsBlock = new TransformBlock<Job, Job>(async job =>
        {
            await job.Prepare();
            return job;
        }, _options);

        _balancerBlock = new BufferBlock<Job>(_options);

        _typeATickBlock = new TransformBlock<Job, Job>(async job =>
        {
            await job.Tick();
            return job;
        }, _options);

        _typeBTickBlock = new TransformBlock<Job, Job>(async job =>
        {
            await job.Tick();
            await job.Tick();
            return job;
        }, _options);

        _writeBlock = new TransformBlock<Job, Job>(job =>
        {
            Console.WriteLine(job.Ticker);
            return job;
        }, _options);

        _finalCleanupBlock = new ActionBlock<Job>(job => job.finalCleanUp(), _options);

        _intermediateCleanupBlock = new TransformBlock<Job, Job>(job =>
        {
            job.IntermediateCleanUp();
            return job;
        }, _options);
    }

    private void ConfigureBlocks()
    {
        _preparationsBlock.LinkTo(_balancerBlock, _linkOptions);

        _balancerBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
        _balancerBlock.LinkTo(_typeBTickBlock, _linkOptions, job => job.Type == Type.B);

        _typeATickBlock.LinkTo(_typeATickBlock, _linkOptions, job => !job.IsCommitable());
        _typeATickBlock.LinkTo(_writeBlock, _linkOptions, job => job.IsCommitable());

        _typeBTickBlock.LinkTo(_typeBTickBlock, _linkOptions, job => !job.IsCommitable());

        _writeBlock.LinkTo(_intermediateCleanupBlock, _linkOptions, job => !job.IsFinished());
        _writeBlock.LinkTo(_finalCleanupBlock, _linkOptions, job => job.IsFinished());

        _intermediateCleanupBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
    }
}

这是我的数据流管道,代表我上面的“艺术品”:D。 所有这些都在我的调度程序中执行,该调度程序在 Programm.cs 中启动:

public class Scheduler
{
    private readonly Timer _timer;

    private readonly Dataflow _flow;


    public Scheduler(int intervall)
    {
        _timer = new Timer(intervall);
        _flow = new Dataflow();
    }

    public void Start()
    {
        _timer.AutoReset = false;
        _timer.Elapsed += _timer_Elapsed;
        _timer.Start();
    }

    private async void _timer_Elapsed(object sender, ElapsedEventArgs e)
    {
        try
        {
            _timer.Stop();
            Console.WriteLine("Timer stopped");
            await _flow.Process().ConfigureAwait(false);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.ToString());
        }
        finally
        {
            Console.WriteLine("Timer started again.");
            _timer.Start();
        }
    }
}

class Program
{
    static  void Main(string[] args)
    {
        var scheduler = new Scheduler(1000);
        scheduler.Start();

        Console.ReadKey();

    }
}

我得到的控制台输出是: 计时器停止 准备中 滴答作响 试图 promise 试图完成 滴答作响 试图 promise 试图完成 滴答作响 试图 promise 试图完成 滴答作响 试图 promise 试图完成 滴答作响 试图 promise 试图完成 滴答作响 试图 promise 试图完成 滴答作响 试图 promise 试图完成 滴答作响 试图 promise 试图完成 滴答作响 试图 promise 试图完成 滴答作响 试图 promise 试图完成 试图 promise 努力完成

似乎程序在那个时候已经停止工作,因为我没有遇到任何断点或进一步。我认为我所有的 block 都已经收到完成信号,因此停止接收任何新项目。因此我的问题是:如何管理完成信号,以便管道仅在没有更多工作要做时完成?

最佳答案

您的流程的主要问题是您的滴答 block 的反馈循环。这导致两个问题。

  • 背压
  • 完成流程

第一:背压

_typeATickBlock 链接回自身时,一旦达到其容量,它将停止接受所有消息。在您的案例 4 中,这意味着一旦它在输出缓冲区中有 3 条消息并且正在处理一条消息,它将停止接受和传递消息。您可以通过将以下行添加到 block 中来查看:

Console.WriteLine($"Tick Block {_typeATickBlock.InputCount}/{_typeATickBlock.OutputCount}");

并将输出:

Tick Block 0/3

要解决此问题,您可以添加任何缓冲 block 、Buffer 或 Transform。关键是缓冲区的有限容量。在您的情况下,每条消息都需要重新路由回 tick block 。有了它,您就知道您的容量需要在任何给定时间与消息量相匹配。在本例中为 500。

_printingBuffer = new TransformBlock<Job, Job>(job =>
{
    Console.WriteLine($"{_printingBuffer.InputCount}/{_printingBuffer.OutputCount}");
    return job;
}, new ExecutionDataflowBlockOptions() { BoundedCapacity = 500 });

在您的实际代码中,您可能不知道该值,Unbounded 可能是避免锁定管道的最佳选择,但您可以根据传入量调整该值。

二:完成流程

在你的管道中有一个反馈循环,完成传播变得比简单地设置链接选项更困难。一旦完成到达滴答 block ,它就会停止接受所有消息,即使是那些仍然需要处理的消息。为避免这种情况,您需要暂停传播,直到所有消息都通过循环。首先,您在滴答 block 之前停止传播,然后检查参与循环的每个 block 上的缓冲区。然后,一旦所有缓冲区都为空,就会将完成和故障传播到 block 。

_balancerBlock.Completion.ContinueWith(tsk =>
{
    while (!_typeATickBlock.Completion.IsCompleted)
    {
        if (_printingBuffer.InputCount == 0 && _printingBuffer.OutputCount == 0
        && _typeATickBlock.InputCount == 0 && _typeATickBlock.OutputCount == 0)
        {
            _typeATickBlock.Complete();
        }
    }
});

最后

带有完成设置和插入缓冲区的完整 ConfigureBlocks 应如下所示。请注意,我在这里只传递完整而不是错误,并且我删除了 B 型分支。

private void ConfigureBlocks()
{
    _preparationsBlock.LinkTo(_balancerBlock, _linkOptions);

    _balancerBlock.LinkTo(_typeATickBlock, job => job.Type == Type.A);

    _balancerBlock.Completion.ContinueWith(tsk =>
    {
        while (!_typeATickBlock.Completion.IsCompleted)
        {
            if (_printingBuffer.InputCount == 0 && _printingBuffer.OutputCount == 0
            && _typeATickBlock.InputCount == 0 && _typeATickBlock.OutputCount == 0)
            {
                _typeATickBlock.Complete();
            }
        }
    });

    _typeATickBlock.LinkTo(_printingBuffer, job => !job.IsCommitable());
    _printingBuffer.LinkTo(_typeATickBlock);
    _typeATickBlock.LinkTo(_writeBlock, _linkOptions, job => job.IsCommitable());            

    _writeBlock.LinkTo(_intermediateCleanupBlock, _linkOptions, job => !job.IsFinished());
    _writeBlock.LinkTo(_finalCleanupBlock, _linkOptions, job => job.IsFinished());

    _intermediateCleanupBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
}

我前阵子写了一篇博文,博客不再活跃,关于用反馈循环处理完成。它可能会提供更多帮助。从 WayBackMachine 中检索。

Finding Completion in a Complex Flow: Feedback Loops

关于c# - 如何正确管理 TPL 数据流中的完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55324214/

相关文章:

c# - 并行嵌套操作返回奇怪的结果

c# - 在 C# 中使用 Async 和 await 时出错

c# - 数据流的 block 设计受阻

c# - IEnumerator 的源代码在哪里?

c# - 如何像在 Winform 中那样以编程方式在 WPF 中添加事件处理程序

c# - 使用响应式(Reactive)扩展,我可以创建可观察的订阅者,该订阅者会阻塞直到满足某些条件或发生超时

c# - 等待 ActionBlock<T> - TPL 数据流

c# - 避免在一个 block 出现故障时关闭整个数据流网络

c# - Xamarin Forms 主细节隐藏后退按钮

c# - 如何从 .Net Core 2.1 中的静态函数访问我的配置?