我在这里遇到了挑战,寻找解决方案让我头疼。
我有一个List
某事,我执行 Parallel.ForEach
基于它:
List<Customer> customers = GetNotProcessedCostumer();
Parallel.ForEach(customers, new ParallelOptions {MaxDegreeOfParallelism = 2},
cust=>
{
ExecuteSomething(cust);
});
这里的问题是我需要打电话GetNotProcessedCostumer
再次检查数据库上是否有新的未处理项目可用,同时该并行仍在运行。
再次调用该方法就可以了,但是,如何在 List
中插入新项目?并行已经在使用?
换句话说,List<Customer>
是活着的,我需要一直在其上插入项目,并尝试使用现有 Parallel
中的可用线程。 。看看:
List<Customer> customers = GetNotProcessCustomer // get not processed customers from database
Parallel.ForEach(customers) // ...... Start the parallel ...
customer.Add(GetNotProcessCustomer()) // Read database again..
“嘿 Parallel,您有可用的线程吗?”如果是,请使用它。
我可以接受其他方法和想法,例如 Threads
, ThreadPool
......
有人可以帮我吗?
最佳答案
可能有比 Parallel
更好的方法来完成这项工作。类,具有 ActionBlock<Customer>
来自TPL Dataflow图书馆是最有前途的候选人。但是,如果您想使用已有的知识来完成工作,您可以为并行循环提供延迟的 IEnumerable<Customer>
序列而不是物化 List<Customer>
。此序列将查询数据库并在永无休止的循环中生成未处理的客户。添加 Task.Delay
可能是个好主意在混合中,以确保数据库的查询频率不会超过每 X 秒一次。
IEnumerable<Customer> GetNotProcessedCustomersNonStop(
CancellationToken cancellationToken = default)
{
while (true)
{
var delayTask = Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
foreach (var customer in GetNotProcessedCustomers())
yield return customer;
delayTask.GetAwaiter().GetResult();
}
}
添加 CancellationToken
在混合中可能也是一个好主意,因为最终你想停止循环,不是吗?
如果您不熟悉延迟可枚举序列和 yield
声明,你可以看一下这个文档:Iterators
最后一个重要细节是告诉Parallel
您不希望它做一些花哨的事情,例如贪婪地枚举可枚举项并缓存其项目。您希望它仅在准备好处理下一个客户时才吸引下一个客户。您可以通过抛出 Partitioner.Create
来做到这一点在混合中。将所有内容放在一起:
var cts = new CancellationTokenSource();
var source = Partitioner.Create(GetNotProcessedCustomersNonStop(cts.Token),
EnumerablePartitionerOptions.NoBuffering);
var parallelOptions = new ParallelOptions()
{
MaxDegreeOfParallelism = 2,
CancellationToken = cts.Token,
};
Parallel.ForEach(source, parallelOptions, customer =>
{
ProcessCustomer(customer);
});
//cts.Cancel(); // eventually...
关于c# - 基于需要在执行时间内接收更多项目的列表执行 Parallel.ForEach,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69858193/