c# - 任务并行库中的优先队列

标签 c# multithreading task-parallel-library priority-queue producer-consumer

是否有将任务添加到具有不同优先级的 TPL 运行时的任何先前工作?

如果不是,一般来说,我将如何实现?

理想情况下,我计划使用生产者-消费者模式将“待办事项”工作添加到 TPL。有时我可能会发现低优先级作业需要升级到高优先级作业(相对于其他作业)。

如果有人有一些我在搜索时应该使用的搜索关键字,请提及它们,因为我还没有找到可以满足我需要的代码。

最佳答案

所以这是围绕一个相当简单的优先级队列的一个相当简单的并发实现。这里的想法是,有一个排序集保存真实项目和优先级的成对,但被赋予一个只比较优先级的比较器。构造函数采用计算给定对象优先级的函数。

至于实际实现,它们没有有效地实现,我只是lock周围的一切。创建更高效​​的实现将阻止使用 SortedSet作为优先级队列,重新实现其中一个可以有效并发访问的队列并不是那么容易。

为了更改项目的优先级,您需要从集合中删除该项目然后再次添加它,并且要在不迭代整个集合的情况下找到它,您需要知道旧的优先级以及新的优先级。

public class ConcurrentPriorityQueue<T> : IProducerConsumerCollection<T>
{
    private object key = new object();
    private SortedSet<Tuple<T, int>> set;

    private Func<T, int> prioritySelector;

    public ConcurrentPriorityQueue(Func<T, int> prioritySelector, IComparer<T> comparer = null)
    {
        this.prioritySelector = prioritySelector;
        set = new SortedSet<Tuple<T, int>>(
            new MyComparer<T>(comparer ?? Comparer<T>.Default));
    }

    private class MyComparer<T> : IComparer<Tuple<T, int>>
    {
        private IComparer<T> comparer;
        public MyComparer(IComparer<T> comparer)
        {
            this.comparer = comparer;
        }
        public int Compare(Tuple<T, int> first, Tuple<T, int> second)
        {
            var returnValue = first.Item2.CompareTo(second.Item2);
            if (returnValue == 0)
                returnValue = comparer.Compare(first.Item1, second.Item1);
            return returnValue;
        }
    }

    public bool TryAdd(T item)
    {
        lock (key)
        {
            return set.Add(Tuple.Create(item, prioritySelector(item)));
        }
    }

    public bool TryTake(out T item)
    {
        lock (key)
        {
            if (set.Count > 0)
            {
                var first = set.First();
                item = first.Item1;
                return set.Remove(first);
            }
            else
            {
                item = default(T);
                return false;
            }
        }
    }

    public bool ChangePriority(T item, int oldPriority, int newPriority)
    {
        lock (key)
        {
            if (set.Remove(Tuple.Create(item, oldPriority)))
            {
                return set.Add(Tuple.Create(item, newPriority));
            }
            else
                return false;
        }
    }

    public bool ChangePriority(T item)
    {
        lock (key)
        {
            var result = set.FirstOrDefault(pair => object.Equals(pair.Item1, item));

            if (object.Equals(result.Item1, item))
            {
                return ChangePriority(item, result.Item2, prioritySelector(item));
            }
            else
            {
                return false;
            }
        }
    }

    public void CopyTo(T[] array, int index)
    {
        lock (key)
        {
            foreach (var item in set.Select(pair => pair.Item1))
            {
                array[index++] = item;
            }
        }
    }

    public T[] ToArray()
    {
        lock (key)
        {
            return set.Select(pair => pair.Item1).ToArray();
        }
    }

    public IEnumerator<T> GetEnumerator()
    {
        return ToArray().AsEnumerable().GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    public void CopyTo(Array array, int index)
    {
        lock (key)
        {
            foreach (var item in set.Select(pair => pair.Item1))
            {
                array.SetValue(item, index++);
            }
        }
    }

    public int Count
    {
        get { lock (key) { return set.Count; } }
    }

    public bool IsSynchronized
    {
        get { return true; }
    }

    public object SyncRoot
    {
        get { return key; }
    }
}

一旦你有了 IProducerConsumerCollection<T>实例,即上述对象,您可以将其用作 BlockingCollection<T> 的内部支持对象为了拥有更易于使用的用户界面。

关于c# - 任务并行库中的优先队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14941027/

相关文章:

c# - 有 "single-item-sized async task buffer"这样的同步工具吗?

c# - MySqlDataAdapter错误: Type of value has a mismatch with column type

c# - 为什么我不能使用 "Type"在 C# 中创建新变量?

安卓设计 : How to run 3 different threads in Background for monitoring 3 different devices

android - 为什么viewModelScope.launch默认运行在主线程上

c# - 从 Azure Application Insights 忽略端点

java - 在 Java 中命令线程的执行

c# - AttachedToParent 任务混淆

c# - Task.StartNew Parallel.ForEach 不等待

c# - 匿名并行任务计时器?