c# - 我应该如何使用 DataflowBlockOptions.CancellationToken?

标签 c# tpl-dataflow

我如何使用 DataflowBlockOptions.CancellationToken

如果我创建 BufferBlock 的实例像这样:

var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = _cts.Token });

然后有使用 queue 的消费者/生产者方法,如何使用它的 CancellationToken 来处理取消?

例如在生产者方法中,我如何检查取消 token - 我没有找到任何属性来访问 token ..

编辑: 生产/消费方法示例:

private static async Task Produce(BufferBlock<int> queue, IEnumerable<int> values)
{
    foreach (var value in values)
    {
        await queue.SendAsync(value);
    }

    queue.Complete();
}

private static async Task<IEnumerable<int>> Consume(BufferBlock<int> queue)
{
    var ret = new List<int>();
    while (await queue.OutputAvailableAsync())
    {
        ret.Add(await queue.ReceiveAsync());
    }

    return ret;
}

调用代码:

var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = _cts.Token });

// Start the producer and consumer.
var values = Enumerable.Range(0, 10);
Produce(queue, values);
var consumer = Consume(queue);

// Wait for everything to complete.
await Task.WhenAll(consumer, queue.Completion);

编辑2:

如果我调用 _cts.Cancel() , Produce方法不会取消并不会中断地完成。

最佳答案

如果你想取消生产过程,你应该在其中传递 token ,如下所示:

    private static async Task Produce(
        BufferBlock<int> queue, 
        IEnumerable<int> values,
        CancellationToken token
        )
    {
        foreach (var value in values)
        {
            await queue.SendAsync(value, token);
            Console.WriteLine(value);
        }

        queue.Complete();
    }

    private static async Task<IEnumerable<int>> Consume(BufferBlock<int> queue)
    {
        var ret = new List<int>();
        while (await queue.OutputAvailableAsync())
        {
            ret.Add(await queue.ReceiveAsync());
        }

        return ret;
    }

    static void Main(string[] args)
    {
        var cts = new CancellationTokenSource();

        var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = cts.Token });

        // Start the producer and consumer.
        var values = Enumerable.Range(0, 100);
        Produce(queue, values, cts.Token);
        var consumer = Consume(queue);

        cts.Cancel();

        try
        {
            Task.WaitAll(consumer, queue.Completion);
        }
        catch (Exception e)
        {
            Console.WriteLine(e.ToString());
        }

        foreach (var i in consumer.Result)
        {
            Console.WriteLine(i);
        }

        Console.ReadKey();

关于c# - 我应该如何使用 DataflowBlockOptions.CancellationToken?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29748146/

相关文章:

c# - 有条件地在单独的线程上运行类的方法

c# - 如何从 Windows 窗体的组合框中隐藏特定项目

c# - 如何最大化进程吞吐量(C#)?

c# - 如何让异步流返回两个数据源

c# - 在 C# 中通过 OpenXML 在 word 文件中定义具有 RTL 方向的段落

c# - 在析构函数中处理对象

c# - PredicateBuilder:OR 条件嵌套在 .And() 中

.net - 在 VS 2012 RC 中引用 TPL 数据流和 TPL 的问题

c# - 具有有限容量的转换 block 中的 TPL 数据流异常