我有一个从多个线程访问的类的实例。此类接受此调用并将元组添加到数据库中。我需要以串行方式完成此操作,因为由于某些数据库约束,并行线程可能会导致数据库不一致。
由于我不熟悉 C# 中的并行性和并发性,所以我这样做了:
private BlockingCollection<Task> _tasks = new BlockingCollection<Task>();
public void AddDData(string info)
{
Task t = new Task(() => { InsertDataIntoBase(info); });
_tasks.Add(t);
}
private void InsertWorker()
{
Task.Factory.StartNew(() =>
{
while (!_tasks.IsCompleted)
{
Task t;
if (_tasks.TryTake(out t))
{
t.Start();
t.Wait();
}
}
});
}
AddDData
是被多线程调用的,InsertDataIntoBase
是一个非常简单的插入,只需几毫秒。
问题是,出于某种原因,我缺乏知识,无法弄清楚,有时一个任务会被调用两次!它总是这样:
T1 T2 T3 T1 <- PK 错误。 T4 ...
我对 .Take()
的理解是否完全错误,是我遗漏了什么,还是我的生产者/消费者实现真的很糟糕?
最好的问候, 拉斐尔
更新:
按照建议,我使用此架构进行了快速沙盒测试实现,正如我所怀疑的那样,它不能保证在前一个任务完成之前不会触发任务。
所以问题仍然存在:如何正确地排队任务并按顺序触发它们?
更新 2:
我简化了代码:
private BlockingCollection<Data> _tasks = new BlockingCollection<Data>();
public void AddDData(Data info)
{
_tasks.Add(info);
}
private void InsertWorker()
{
Task.Factory.StartNew(() =>
{
while (!_tasks.IsCompleted)
{
Data info;
if (_tasks.TryTake(out info))
{
InsertIntoDB(info);
}
}
});
}
请注意,我摆脱了 Tasks,因为我依赖于同步的 InsertIntoDB 调用(因为它在一个循环中),但仍然没有运气......这一代很好,我绝对确定只有独特的实例是去排队。但无论我怎么尝试,有时同一个对象会被使用两次。
最佳答案
我认为这应该可行:
private static BlockingCollection<string> _itemsToProcess = new BlockingCollection<string>();
static void Main(string[] args)
{
InsertWorker();
GenerateItems(10, 1000);
_itemsToProcess.CompleteAdding();
}
private static void InsertWorker()
{
Task.Factory.StartNew(() =>
{
while (!_itemsToProcess.IsCompleted)
{
string t;
if (_itemsToProcess.TryTake(out t))
{
// Do whatever needs doing here
// Order should be guaranteed since BlockingCollection
// uses a ConcurrentQueue as a backing store by default.
// http://msdn.microsoft.com/en-us/library/dd287184.aspx#remarksToggle
Console.WriteLine(t);
}
}
});
}
private static void GenerateItems(int count, int maxDelayInMs)
{
Random r = new Random();
string[] items = new string[count];
for (int i = 0; i < count; i++)
{
items[i] = i.ToString();
}
// Simulate many threads adding items to the collection
items
.AsParallel()
.WithDegreeOfParallelism(4)
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.Select((x) =>
{
Thread.Sleep(r.Next(maxDelayInMs));
_itemsToProcess.Add(x);
return x;
}).ToList();
}
这确实意味着消费者是单线程的,但允许多个生产者线程。
关于c# - 异步生产者/消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18749185/