我将生产者/消费者模式与FileHelpers库一起使用,以使用多个线程从一个文件(可能很大)中导入数据。每个线程都应该导入该文件的一大块,我想使用FileHelperAsyncEngine实例的LineNumber属性,该实例正在读取文件作为导入行的主键。
FileHelperAsyncEngine内部具有IEnumerator IEnumerable.GetEnumerator();。
使用engine.ReadNext()方法进行迭代。在内部设置LineNumber属性(似乎不是线程安全的)。
使用者将具有与生产者相关联的生产者,生产者将向消费者提供DataTables,消费者将通过SqlBulkLoad类消费它们,该类将使用IDataReader实现,该实现将遍历Consumer实例内部的一组数据表。的每个实例都有一个与之关联的SqlBulkCopy实例。
我有线程锁定问题。下面是我如何创建多个Producer线程的方法。我开始每个线程的后记。生产者实例上的生产方法将被称为确定将处理输入文件的哪个块。
似乎engine.LineNumber不是线程安全的,并且我没有在数据库中导入适当的LineNumber。看来到了engine.LineNumber被读取的另一个线程称为engine.ReadNext()并更改了engine.LineNumber属性。我不想锁定应该处理输入文件块的循环,因为我失去了并行性。如何重新组织代码以解决此线程问题?
谢谢
拉德
for (int i = 0; i < numberOfProducerThreads; i++)
DataConsumer consumer = dataConsumers[i];
//create a new producer
DataProducer producer = new DataProducer();
//consumer has already being created
consumer.Subscribe(producer);
FileHelperAsyncEngine orderDetailEngine = new FileHelperAsyncEngine(recordType);
orderDetailEngine.Options.RecordCondition.Condition = RecordCondition.ExcludeIfBegins;
orderDetailEngine.Options.RecordCondition.Selector = STR_ORDR;
int skipLines = i * numberOfBufferTablesToProcess * DataBuffer.MaxBufferRowCount;
Thread newThread = new Thread(() =>
{
producer.Produce(consumer, inputFilePath, lineNumberFieldName, dict, orderDetailEngine, skipLines, numberOfBufferTablesToProcess);
consumer.SetEndOfData(producer);
});
producerThreads.Add(newThread); thread.Start();}
public void Produce(DataConsumer consumer, string inputFilePath, string lineNumberFieldName, Dictionary<string, object> dict, FileHelperAsyncEngine engine, int skipLines, int numberOfBufferTablesToProcess)
{
lock (this)
{
engine.Options.IgnoreFirstLines = skipLines;
engine.BeginReadFile(inputFilePath);
}
int rowCount = 1;
DataTable buffer = consumer.BufferDataTable;
while (engine.ReadNext() != null)
{
lock (this)
{
dict[lineNumberFieldName] = engine.LineNumber;
buffer.Rows.Add(ObjectFieldsDataRowMapper.MapObjectFieldsToDataRow(engine.LastRecord, dict, buffer));
if (rowCount % DataBuffer.MaxBufferRowCount == 0)
{
consumer.AddBufferDataTable(buffer);
buffer = consumer.BufferDataTable;
}
if (rowCount % (numberOfBufferTablesToProcess * DataBuffer.MaxBufferRowCount) == 0)
{
break;
}
rowCount++;
}
}
if (buffer.Rows.Count > 0)
{
consumer.AddBufferDataTable(buffer);
}
engine.Close();
}
最佳答案
Dictionary <>不是线程安全的。上面代码中的字典是否已正确锁定或仅在您的锁中使用了?
顺便说一句,我将避免使用lock(this)范式,并使用通用对象来锁定您的代码。您可能会遇到其他与特定资源无关的锁定问题。我在我的博客(Smart Resource Locking in C# .Net for Thread Safe Code)上详细介绍了该问题。高温超导
关于c# - FileHelpers在调用engine.ReadNext()方法和readign engine.LineNumber属性之间的线程锁定问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/2623896/