c# - 如何(以及是否)使用 TPL 编写单一消费者队列?

标签 c# .net .net-4.0 queue task-parallel-library

我最近听到了很多关于 .NET 4.0 中的 TPL 的播客。它们中的大多数描述了后台事件,例如下载图像或进行计算,使用任务以使工作不会干扰 GUI 线程。

我处理的大部分代码更多地具有多生产者/单一消费者的风格,其中来自多个来源的工作项必须排队,然后按顺序处理。一个例子是日志记录,其中来自多个线程的日志行被顺序排列到一个队列中,以便最终写入文件或数据库。来自任何单一来源的所有记录必须保持有序,并且来自同一时刻的记录在最终输出中应该彼此“接近”。

所以多个线程或任务或任何东西都在调用队列:

lock( _queue ) // or use a lock-free queue!
{
   _queue.enqueue( some_work );
   _queueSemaphore.Release();
}

一个专用的工作线程处理队列:

while( _queueSemaphore.WaitOne() )
{
   lock( _queue )
   {
      some_work = _queue.dequeue();     
   }
   deal_with( some_work );
}

为这些任务的消费者端专门分配一个工作线程似乎总是合理的。我应该改用 TPL 中的某些构造来编写 future 的程序吗?哪一个?为什么?

最佳答案

您可以按照 Wilka 的建议使用长时间运行的任务来处理 BlockingCollection 中的项目。这是一个非常符合您的应用程序要求的示例。你会看到类似这样的输出:

Log from task B
Log from task A
Log from task B1
Log from task D
Log from task C

并不是说 A、B、C 和 D 的输出看起来是随机的,因为它们取决于线程的开始时间,但 B 总是出现在 B1 之前。

public class LogItem 
{
    public string Message { get; private set; }

    public LogItem (string message)
    {
        Message = message;
    }
}

public void Example()
{
    BlockingCollection<LogItem> _queue = new BlockingCollection<LogItem>();

    // Start queue listener...
    CancellationTokenSource canceller = new CancellationTokenSource();
    Task listener = Task.Factory.StartNew(() =>
        {
            while (!canceller.Token.IsCancellationRequested)
            {
                LogItem item;
                if (_queue.TryTake(out item))
                    Console.WriteLine(item.Message);
            }
        },
    canceller.Token, 
    TaskCreationOptions.LongRunning,
    TaskScheduler.Default);

    // Add some log messages in parallel...
    Parallel.Invoke(
        () => { _queue.Add(new LogItem("Log from task A")); },
        () => { 
            _queue.Add(new LogItem("Log from task B")); 
            _queue.Add(new LogItem("Log from task B1")); 
        },
        () => { _queue.Add(new LogItem("Log from task C")); },
        () => { _queue.Add(new LogItem("Log from task D")); });

    // Pretend to do other things...
    Thread.Sleep(1000);

    // Shut down the listener...
    canceller.Cancel();
    listener.Wait();
}

关于c# - 如何(以及是否)使用 TPL 编写单一消费者队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/2293976/

相关文章:

c# - 在 Windows Phone 7 中获取 EXIF 标签

c# - 将 24 位 bmp 转换为 16 位?

c# 3.0 转换接口(interface)泛型类型

c# - 如何自动聚焦在后台运行的 .Net winform 应用程序

c# - 如何在 .NET 中复制蓝牙的 CCM 方案?

c# - 如何使用 say MEF 导出和导入应用程序服务?

c# - IOException HResult 可能的值

debugging - 为什么 .net 中的对象引用错误异常不告诉我哪个对象为空?

.net - 具有只读属性的 WCF DataContract

asp.net - 如何在web.config中定义using语句?