我正在调用一个调用数据库的工作方法,然后迭代并产生返回值以进行并行处理。为了防止它破坏数据库,我在那里有一个 Thread.Sleep 来暂停对数据库的执行。但是,这似乎阻止了 Parallel.ForEach 中仍在发生的执行。实现此目的以防止阻塞的最佳方法是什么?
private void ProcessWorkItems()
{
_cancellation = new CancellationTokenSource();
_cancellation.Token.Register(() => WorkItemRepository.ResetAbandonedWorkItems());
Task.Factory.StartNew(() =>
Parallel.ForEach(GetWorkItems().AsParallel().WithDegreeOfParallelism(10), workItem =>
{
var x = ItemFactory(workItem);
x.doWork();
}), _cancellation.Token);
}
private IEnumerable<IAnalysisServiceWorkItem> GetWorkItems()
{
while (!_cancellation.IsCancellationRequested)
{
var workItems = WorkItemRepository.GetItemList(); //database call
workItems.ForEach(item =>
{
item.QueueWorkItem(WorkItemRepository);
});
foreach (var item in workItems)
{
yield return item;
}
if (workItems.Count == 0)
{
Thread.Sleep(30000); //sleep this thread for 30 seconds if no work items.
}
}
yield break;
}
编辑: 我将其更改为包含答案,但它仍然无法正常工作。我将 .AsParallel().WithDegreeOfParallelism(10) 添加到 GetWorkItems() 调用中。当我认为 Parallel 应该继续执行时,即使基本线程正在休眠,我的期望是否不正确?
例子: 我有 15 个项目,它迭代并抓取 10 个项目并启动它们。当每个完成时,它会从 GetWorkItems 请求另一个,直到它尝试请求第 16 个项目。那时它应该停止尝试获取更多项目,但应该继续处理项目 11-15 直到完成。这就是并行工作的方式吗?因为它目前没有这样做。它当前正在做的是当它完成 6 时,它锁定仍在 Parallel.ForEach 中运行的后续 10。
最佳答案
我建议您创建一个 BlockingCollection (一个队列)工作项,以及一个每 30 秒调用一次数据库以填充它的计时器。像这样的东西:
BlockingCollection<WorkItem> WorkItems = new BlockingCollection<WorkItem>();
初始化时:
System.Threading.Timer WorkItemTimer = new Timer((s) =>
{
var items = WorkItemRepository.GetItemList(); //database call
foreach (var item in items)
{
WorkItems.Add(item);
}
}, null, 30000, 30000);
这将每 30 秒查询一次数据库中的项目。
要安排要处理的工作项,您有许多不同的解决方案。最接近你所拥有的是这样的:
WorkItem item;
while (WorkItems.TryTake(out item, Timeout.Infinite, _cancellation))
{
Task.Factory.StartNew((s) =>
{
var myItem = (WorkItem)s;
// process here
}, item);
}
这消除了任何线程中的阻塞,并让 TPL 决定如何最好地分配并行任务。
编辑:
实际上,更接近你所拥有的是:
foreach (var item in WorkItems.GetConsumingEnumerable(_cancellation))
{
// start task to process item
}
您可以使用:
Parallel.Foreach(WorkItems.GetConsumingEnumerable(_cancellation).AsParallel ...
我不知道这是否有效或效果如何。也许值得尝试一下 。 . .
编辑结束
一般来说,我的建议是将其视为生产者/消费者应用程序,生产者是定期查询数据库以获取新项目的线程。我的示例每 N(本例中为 30)秒查询一次数据库,如果平均而言,您可以每 30 秒清空一次工作队列,那么该示例将运行良好。从项目发布到数据库到您获得结果,平均延迟不到一分钟。
您可以降低轮询频率(从而降低延迟),但这会导致更多的数据库流量。
你也可以用它变得更漂亮。例如,如果您在 30 秒后轮询数据库并获得大量项目,那么您可能很快就会获得更多,并且您会希望在 15 秒(或更短)后再次轮询。相反,如果您在 30 秒后轮询数据库但没有得到任何结果,那么您可能需要等待更长的时间才能再次轮询。
您可以使用一次性计时器设置这种自适应轮询。也就是说,您在创建计时器时将最后一个参数指定为 -1,这会导致它仅触发一次。您的计时器回调计算出在下一次轮询之前等待多长时间,并调用 Timer.Change
以使用新值初始化计时器。
关于c# - Thread.Sleep 阻塞并行执行任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7561196/