c# - 确保ThreadPool中的任务执行顺序

标签 c# multithreading design-patterns concurrency threadpool

我一直在阅读有关线程池模式的内容,但我似乎找不到以下问题的常用解决方案。

我有时希望任务能够串行执行。例如,我从文件中读取文本 block ,由于某种原因,我需要按该顺序处理这些文本 block 。所以基本上我想消除某些任务的并发性

考虑这种情况,其中带有 * 的任务需要按照推送的顺序进行处理。其他任务可以按照任何顺序进行处理。

push task1
push task2
push task3   *
push task4   *
push task5
push task6   *
....
and so on

在线程池的上下文中,如果没有这个约束,单个待处理任务队列可以正常工作,但显然在这里不行。

我考虑过让一些线程在特定于线程的队列上操作,而其他线程在“全局”队列上操作。然后,为了串行执行某些任务,我只需将它们推送到单个线程查找的队列中即可。它确实听起来有点笨拙。

那么,这个长篇故事中真正的问题是:你将如何解决这个问题? 您如何确保这些任务按顺序排列

编辑

作为一个更普遍的问题,假设上面的场景变成了

push task1
push task2   **
push task3   *
push task4   *
push task5
push task6   *
push task7   **
push task8   *
push task9
....
and so on

我的意思是,组内的任务应该按顺序执行,但组本身可以混合。例如,您可以使用 3-2-5-4-7

另一件事需要注意的是,我无法预先访问组中的所有任务(并且我迫不及待地等待所有任务都到达后再开始组)。

最佳答案

类似下面的内容将允许串行和并行任务排队,其中串行任务将一个接一个地执行,而并行任务将以任何顺序执行,但并行。这使您能够在必要时序列化任务,也可以具有并行任务,但在收到任务时执行此操作,即您不需要预先了解整个序列,执行顺序是动态维护的。

internal class TaskQueue
{
    private readonly object _syncObj = new object();
    private readonly Queue<QTask> _tasks = new Queue<QTask>();
    private int _runningTaskCount;

    public void Queue(bool isParallel, Action task)
    {
        lock (_syncObj)
        {
            _tasks.Enqueue(new QTask { IsParallel = isParallel, Task = task });
        }

        ProcessTaskQueue();
    }

    public int Count
    {
        get{lock (_syncObj){return _tasks.Count;}}
    }

    private void ProcessTaskQueue()
    {
        lock (_syncObj)
        {
            if (_runningTaskCount != 0) return;

            while (_tasks.Count > 0 && _tasks.Peek().IsParallel)
            {
                QTask parallelTask = _tasks.Dequeue();

                QueueUserWorkItem(parallelTask);
            }

            if (_tasks.Count > 0 && _runningTaskCount == 0)
            {
                QTask serialTask = _tasks.Dequeue();

                QueueUserWorkItem(serialTask);
            }
        }
    }

    private void QueueUserWorkItem(QTask qTask)
    {
        Action completionTask = () =>
        {
            qTask.Task();

            OnTaskCompleted();
        };

        _runningTaskCount++;

        ThreadPool.QueueUserWorkItem(_ => completionTask());
    }

    private void OnTaskCompleted()
    {
        lock (_syncObj)
        {
            if (--_runningTaskCount == 0)
            {
                ProcessTaskQueue();
            }
        }
    }

    private class QTask
    {
        public Action Task { get; set; }
        public bool IsParallel { get; set; }
    }
}

更新

要处理具有串行和并行任务混合的任务组,GroupedTaskQueue 可以为每个组管理一个 TaskQueue。同样,您不需要预先了解码,它都是在收到任务时动态管理的。

internal class GroupedTaskQueue
{
    private readonly object _syncObj = new object();
    private readonly Dictionary<string, TaskQueue> _queues = new Dictionary<string, TaskQueue>();
    private readonly string _defaultGroup = Guid.NewGuid().ToString();

    public void Queue(bool isParallel, Action task)
    {
        Queue(_defaultGroup, isParallel, task);
    }

    public void Queue(string group, bool isParallel, Action task)
    {
        TaskQueue queue;

        lock (_syncObj)
        {
            if (!_queues.TryGetValue(group, out queue))
            {
                queue = new TaskQueue();

                _queues.Add(group, queue);
            }
        }

        Action completionTask = () =>
        {
            task();

            OnTaskCompleted(group, queue);
        };

        queue.Queue(isParallel, completionTask);
    }

    private void OnTaskCompleted(string group, TaskQueue queue)
    {
        lock (_syncObj)
        {
            if (queue.Count == 0)
            {
                _queues.Remove(group);
            }
        }
    }
}

关于c# - 确保ThreadPool中的任务执行顺序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7192223/

相关文章:

c# - 取消订阅套接字流后取消特定的子线程或任务

javascript - react : how to force state changes to take place after setState

c# - 局部变量 C#

javascript - 以 JSON 形式返回字典并使用 JS 函数成功访问其内容 - C#

c# - 可以在使用 asp.net 按钮回发后运行 javascript 函数吗?

c# - C#中的多线程压缩

multithreading - OCaml 中的线程延迟和键盘事件

c++ - 维护当前选定的对象是否很好地使用状态模式?

java - 数字模式截止

c# - 未找到 ManagementBaseObject