c# - 对于非常长寿的线程,建议使用 TPL

标签 c# multithreading queue task-parallel-library

我阅读了任务并行库 ( http://msdn.microsoft.com/en-us/library/dd537609(v=vs.110).aspx ) 的一些 MSDN 文档,特别是有关 TPL 最佳实践用法的内容。

我有一个启动线程的应用程序。线程的目的是监视队列并“处理”已添加的项目。队列上的项目的处理需要按顺序完成,因此我不希望同时处理队列中的多个项目。该线程与 Windows 进程一样存在,并且仅在应用程序退出时关闭。

我想知道使用 TPL 将此后台线程作为任务启动的优缺点。

我最初的直觉是不使用 TPL,因为我会在正在运行的应用程序的整个生命周期中占用一个线程池线程,这可能会阻止其他任务运行并干扰线程池的最佳运行.

我不确定手动启动新的后台线程来执行工作会如何影响应用程序其他隔离部分中 TPL 的使用。

我想知道在这种情况下手动线程还是任务是推荐的方法?或者我需要考虑哪些其他因素才能做出明智的选择?

我已经包含了下面的代码。请注意,代码中的“处理器”通常执行 CPU 绑定(bind)操作,然后“处理程序”通常执行 IO 绑定(bind)操作:

public class TransactionProcessor : IDisposable
{
    private readonly IProcessorConfiguration configuration;
    private readonly IList<Tuple<string, IProcessor>> processors = new List<Tuple<string, IProcessor>>();
    private readonly IList<Tuple<string, IHandler>> handlers = new List<Tuple<string, IHandler>>(); 
    private AutoResetEvent waitForWork = new AutoResetEvent(true);
    private object lockObject = new object();
    private bool processThreadShouldExit = false;
    private Thread processorThread; 
    private Queue queue = new Queue();

    public TransactionProcessor(IProcessorConfiguration configuration)
    {
        if (configuration == null)
        {
            throw new ArgumentNullException("configuration");
        }

        this.configuration = configuration;
        this.Initialise();
    }

    public void Start()
    {
        lock (this.lockObject)
        {
            if (this.processorThread == null)
            {
                this.processThreadShouldExit = false;
                this.processorThread = new Thread(this.Dispatcher);
                this.processorThread.Start();
            }
        }
    }

    public void Stop()
    {
        if (this.processorThread != null)
        {
            this.processThreadShouldExit = true;
            this.waitForWork.Set();
            this.processorThread.Join();
            this.processorThread = null;
        }
    }   

    public void QueueTransactionForProcessing(Transaction Transaction, Guid clientID)
    {
        var queueObject = new QueueObject() { Transaction = Transaction };

        lock (this.lockObject)
        {
            this.queue.Enqueue(queueObject);
        }

        if (this.waitForWork != null)
        {
            this.waitForWork.Set();
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    protected virtual void Dispose(bool disposing)
    {
        if (this.processorThread != null)
        {
            this.Stop();
        }

        if (this.waitForWork != null)
        {
            this.waitForWork.Dispose();
            this.waitForWork = null;
        }
    }

    private void Dispatcher()
    {
        if (this.queue.Count == 0)
        {
            this.waitForWork.Reset();
        }

        while (!this.processThreadShouldExit)
        {
            if (this.queue.Count > 0 || this.waitForWork.WaitOne(60000))
            {
                while (this.queue.Count > 0 && !this.processThreadShouldExit)
                {
                    QueueObject queueObject;
                    lock (this.lockObject)
                    {
                        queueObject = (QueueObject)this.queue.Dequeue();
                    }

                    this.ProcessQueueItem(queueObject);
                }

                if (this.queue.Count == 0)
                {
                    this.waitForWork.Reset();
                }
            }
        }
    }   

    private void ProcessQueueItem(QueueObject item)
    {
        var correlationId = Guid.NewGuid();
        try
        {
            bool continuePipeline = true;
            foreach (var processor in this.processors)
            {
                processor.Item2.Process(item.Transaction, correlationId, ref continuePipeline);
                if (!continuePipeline)
                {
                    break;
                }
            }

            if (continuePipeline)
            {
                foreach (var handler in this.handlers)
                {
                    Transaction clonedTransaction = item.Transaction.Clone();
                    try
                    {
                        handler.Item2.Handle(clonedTransaction, correlationId);
                    }
                    catch (Exception e)
                    {
                    }
                }
            }
        }
        catch (Exception e)
        {               
        }
    }

    private void Initialise()
    {       
        foreach (var processor in this.configuration.Processors)
        {
            try
            {
                Type processorType = Type.GetType(processor.Value);
                if (processorType != null && typeof(IProcessor).IsAssignableFrom(processorType))
                {
                    var processorInstance = (IProcessor)Activator.CreateInstance(processorType);
                    this.processors.Add(new Tuple<string, IProcessor>(processor.Key, processorInstance));
                }
            catch (Exception e)
            {               
            }
        }

        foreach (var handler in this.configuration.Handlers)
        {
            try
            {
                Type handlerType = Type.GetType(handler.Value);
                if (handlerType != null && typeof(IHandler).IsAssignableFrom(handlerType))
                {
                    var handlerInstance = (IHandler)Activator.CreateInstance(handlerType);
                    this.handlers.Add(new Tuple<string, IHandler>(handler.Key, handlerInstance));
                }
            }
            catch (Exception e)
            {   
            }
        }
    }
}

最佳答案

您应该很少需要使用低级构造,例如Thread,TPL 也可以满足长时间运行的任务,并且使用它会产生更灵活和可维护的代码。

您可以使用以下方式启动长时间运行的任务:

 Task.Factory.StartNew(() => {}, TaskCreationOptions.LongRunning);

来自 MSDN doco:

LongRunning Specifies that a task will be a long-running, coarse-grained operation involving fewer, larger components than fine-grained systems. It provides a hint to the TaskScheduler that oversubscription may be warranted. Oversubscription lets you create more threads than the available number of hardware threads.

因此调度程序可以创建额外的线程以确保ThreadPool容量足够。此外,手动创建线程不会影响您在代码的其他部分使用TPL。

就我个人而言,我肯定会选择 TPL 而不是手动创建线程。有一点学习曲线,但它是一个非常强大的库,可以满足各种各样的场景。

关于c# - 对于非常长寿的线程,建议使用 TPL,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25601182/

相关文章:

Java如何使用ListIterator作为队列?

c# - RhinoMocks 错误 "Only assignment, call, increment, decrement, and new object expressions can be used as a statement"

c# - 从不同线程中的类更新文本框

java - 使用可调整大小的数组在 Java 中实现队列 API

python - Python 'multitasking'需要

Python:线程无故停止

python - Queue.Queue 与 collections.deque

c# - 显示消息后关闭应用程序

c# - 为什么具有可为空返回类型的 Func 不适合保存具有对象返回类型的 Func 的字典?

c# - 在 .NET 中解析公式