我正在尝试实现某种具有不同任务权重的任务队列,允许不同数量的任务根据其权重同时运行。
任务有两种类型:长任务和短任务。 最多可以同时执行 N 个短任务。
当长任务出现时,如果没有其他长任务正在运行,则应立即启动,否则等待其完成。
如果有长任务运行,并发短任务数限制应减少到 M。 已经运行的短任务应该继续运行直至完成;但如果当前限制小于或等于当前运行的短任务数量,则不应启动新的短任务。
看来,我基本上需要动态更改信号量“容量”的能力。 通过在需要时占用/释放 (N - M) 个“槽”,可以很容易地减少/增加容量,但是如果有 N 个短任务,那么在 (N - M) 个短任务完成之前,这会导致队列“挂起”任务已在运行。
我还可以实现某种“调度程序”,例如每 100 毫秒唤醒一次,并检查队列中是否有任何现在可以启动的任务。这种方法的缺点是任务入队和启动之间有长达 100 毫秒的延迟。
所以我被这个难题困住了,希望有人能对如何实现这个有一些新的想法。
更新: 任务不会产生任何显着的 CPU 负载。 它们实际上是 HTTP 请求。长请求是上传文件,短请求是常见的HTTP请求。
最佳答案
我回答了一个非常相似的问题a few days ago ,您的解决方案几乎完全相同,使用 QueuedTaskScheduler
来自“ParallelExtensionsExtras”
private static void Main(string[] args)
{
int highPriorityMaxConcurrancy = 1
QueuedTaskScheduler qts = new QueuedTaskScheduler();
var highPriortiyScheduler = qts.ActivateNewQueue(0);
var lowPriorityScheduler = qts.ActivateNewQueue(1);
BlockingCollection<HttpRequestWrapper> fileRequest= new BlockingCollection<Foo>();
BlockingCollection<HttpRequestWrapper> commonRequest= new BlockingCollection<Foo>();
List<Task> processors = new List<Task>(2);
processors.Add(Task.Factory.StartNew(() =>
{
Parallel.ForEach(fileRequest.GetConsumingPartitioner(), //.GetConsumingPartitioner() is also from ParallelExtensionExtras, it gives better performance than .GetConsumingEnumerable() with Parallel.ForEeach(
new ParallelOptions() { TaskScheduler = highPriortiyScheduler, MaxDegreeOfParallelism = highPriorityMaxConcurrancy },
ProcessWork);
}, TaskCreationOptions.LongRunning));
processors.Add(Task.Factory.StartNew(() =>
{
Parallel.ForEach(commonRequest.GetConsumingPartitioner(),
new ParallelOptions() { TaskScheduler = lowPriorityScheduler},
ProcessWork);
}, TaskCreationOptions.LongRunning));
//Add some work to do here to the fileRequest or commonRequest collections
//Lets the blocking collections know we are no-longer going to be adding new items so it will break out of the `ForEach` once it has finished the pending work.
fileRequest.CompleteAdding();
commonRequest.CompleteAdding();
//Waits for the two collections to compleatly empty before continueing
Task.WaitAll(processors.ToArray());
}
private static void ProcessWork(HttpRequestWrapper request)
{
//...
}
关于c# - 在 C# 中同步繁重任务和轻任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21340512/