我一直在阅读有关线程池模式的内容,但我似乎找不到以下问题的常用解决方案。
我有时希望任务能够串行执行。例如,我从文件中读取文本 block ,由于某种原因,我需要按该顺序处理这些文本 block 。所以基本上我想消除某些任务的并发性。
考虑这种情况,其中带有 *
的任务需要按照推送的顺序进行处理。其他任务可以按照任何顺序进行处理。
push task1
push task2
push task3 *
push task4 *
push task5
push task6 *
....
and so on
在线程池的上下文中,如果没有这个约束,单个待处理任务队列可以正常工作,但显然在这里不行。
我考虑过让一些线程在特定于线程的队列上操作,而其他线程在“全局”队列上操作。然后,为了串行执行某些任务,我只需将它们推送到单个线程查找的队列中即可。它确实听起来有点笨拙。
那么,这个长篇故事中真正的问题是:你将如何解决这个问题? 您如何确保这些任务按顺序排列?
编辑
作为一个更普遍的问题,假设上面的场景变成了
push task1
push task2 **
push task3 *
push task4 *
push task5
push task6 *
push task7 **
push task8 *
push task9
....
and so on
我的意思是,组内的任务应该按顺序执行,但组本身可以混合。例如,您可以使用 3-2-5-4-7
。
另一件事需要注意的是,我无法预先访问组中的所有任务(并且我迫不及待地等待所有任务都到达后再开始组)。
最佳答案
类似下面的内容将允许串行和并行任务排队,其中串行任务将一个接一个地执行,而并行任务将以任何顺序执行,但并行。这使您能够在必要时序列化任务,也可以具有并行任务,但在收到任务时执行此操作,即您不需要预先了解整个序列,执行顺序是动态维护的。
internal class TaskQueue
{
private readonly object _syncObj = new object();
private readonly Queue<QTask> _tasks = new Queue<QTask>();
private int _runningTaskCount;
public void Queue(bool isParallel, Action task)
{
lock (_syncObj)
{
_tasks.Enqueue(new QTask { IsParallel = isParallel, Task = task });
}
ProcessTaskQueue();
}
public int Count
{
get{lock (_syncObj){return _tasks.Count;}}
}
private void ProcessTaskQueue()
{
lock (_syncObj)
{
if (_runningTaskCount != 0) return;
while (_tasks.Count > 0 && _tasks.Peek().IsParallel)
{
QTask parallelTask = _tasks.Dequeue();
QueueUserWorkItem(parallelTask);
}
if (_tasks.Count > 0 && _runningTaskCount == 0)
{
QTask serialTask = _tasks.Dequeue();
QueueUserWorkItem(serialTask);
}
}
}
private void QueueUserWorkItem(QTask qTask)
{
Action completionTask = () =>
{
qTask.Task();
OnTaskCompleted();
};
_runningTaskCount++;
ThreadPool.QueueUserWorkItem(_ => completionTask());
}
private void OnTaskCompleted()
{
lock (_syncObj)
{
if (--_runningTaskCount == 0)
{
ProcessTaskQueue();
}
}
}
private class QTask
{
public Action Task { get; set; }
public bool IsParallel { get; set; }
}
}
更新
要处理具有串行和并行任务混合的任务组,GroupedTaskQueue
可以为每个组管理一个 TaskQueue
。同样,您不需要预先了解码,它都是在收到任务时动态管理的。
internal class GroupedTaskQueue
{
private readonly object _syncObj = new object();
private readonly Dictionary<string, TaskQueue> _queues = new Dictionary<string, TaskQueue>();
private readonly string _defaultGroup = Guid.NewGuid().ToString();
public void Queue(bool isParallel, Action task)
{
Queue(_defaultGroup, isParallel, task);
}
public void Queue(string group, bool isParallel, Action task)
{
TaskQueue queue;
lock (_syncObj)
{
if (!_queues.TryGetValue(group, out queue))
{
queue = new TaskQueue();
_queues.Add(group, queue);
}
}
Action completionTask = () =>
{
task();
OnTaskCompleted(group, queue);
};
queue.Queue(isParallel, completionTask);
}
private void OnTaskCompleted(string group, TaskQueue queue)
{
lock (_syncObj)
{
if (queue.Count == 0)
{
_queues.Remove(group);
}
}
}
}
关于c# - 确保ThreadPool中的任务执行顺序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7192223/