c# - FileHelpers在调用engine.ReadNext()方法和readign engine.LineNumber属性之间的线程锁定问题

标签 c# multithreading locking filehelpers

我将生产者/消费者模式与FileHelpers库一起使用,以使用多个线程从一个文件(可能很大)中导入数据。每个线程都应该导入该文件的一大块,我想使用FileHelperAsyncEngine实例的LineNumber属性,该实例正在读取文件作为导入行的主键。
FileHelperAsyncEngine内部具有IEnumerator IEnumerable.GetEnumerator();。
使用engine.ReadNext()方法进行迭代。在内部设置LineNumber属性(似乎不是线程安全的)。

使用者将具有与生产者相关联的生产者,生产者将向消费者提供DataTables,消费者将通过SqlBulkLoad类消费它们,该类将使用IDataReader实现,该实现将遍历Consumer实例内部的一组数据表。的每个实例都有一个与之关联的SqlBulkCopy实例。

我有线程锁定问题。下面是我如何创建多个Producer线程的方法。我开始每个线程的后记。生产者实例上的生产方法将被称为确定将处理输入文件的哪个块。
似乎engine.LineNumber不是线程安全的,并且我没有在数据库中导入适当的LineNumber。看来到了engine.LineNumber被读取的另一个线程称为engine.ReadNext()并更改了engine.LineNumber属性。我不想锁定应该处理输入文件块的循环,因为我失去了并行性。如何重新组织代码以解决此线程问题?

谢谢
拉德

            for (int i = 0; i < numberOfProducerThreads; i++)
            DataConsumer consumer = dataConsumers[i];

            //create a new producer
            DataProducer producer = new DataProducer();

            //consumer has already being created
            consumer.Subscribe(producer);

            FileHelperAsyncEngine orderDetailEngine = new FileHelperAsyncEngine(recordType);
            orderDetailEngine.Options.RecordCondition.Condition = RecordCondition.ExcludeIfBegins;
            orderDetailEngine.Options.RecordCondition.Selector = STR_ORDR;

            int skipLines = i * numberOfBufferTablesToProcess * DataBuffer.MaxBufferRowCount;

            Thread newThread = new Thread(() =>
            {
                producer.Produce(consumer, inputFilePath, lineNumberFieldName, dict, orderDetailEngine, skipLines, numberOfBufferTablesToProcess);
                consumer.SetEndOfData(producer);
            }); 
            producerThreads.Add(newThread); thread.Start();}

    public void Produce(DataConsumer consumer, string inputFilePath, string lineNumberFieldName, Dictionary<string, object> dict, FileHelperAsyncEngine engine, int skipLines, int numberOfBufferTablesToProcess)
    {
        lock (this)
        {
            engine.Options.IgnoreFirstLines = skipLines;
            engine.BeginReadFile(inputFilePath);
        }

        int rowCount = 1;

        DataTable buffer = consumer.BufferDataTable;
        while (engine.ReadNext() != null)
        {
            lock (this)
            {
                dict[lineNumberFieldName] = engine.LineNumber;
                buffer.Rows.Add(ObjectFieldsDataRowMapper.MapObjectFieldsToDataRow(engine.LastRecord, dict, buffer));
                if (rowCount % DataBuffer.MaxBufferRowCount == 0)
                {
                    consumer.AddBufferDataTable(buffer);
                    buffer = consumer.BufferDataTable;
                }
                if (rowCount % (numberOfBufferTablesToProcess * DataBuffer.MaxBufferRowCount) == 0)
                {
                    break;
                }
                rowCount++;
            }
        }
        if (buffer.Rows.Count > 0)
        {
            consumer.AddBufferDataTable(buffer);
        }
        engine.Close();
    }

最佳答案

Dictionary <>不是线程安全的。上面代码中的字典是否已正确锁定或仅在您的锁中使用了?

顺便说一句,我将避免使用lock(this)范式,并使用通用对象来锁定您的代码。您可能会遇到其他与特定资源无关的锁定问题。我在我的博客(Smart Resource Locking in C# .Net for Thread Safe Code)上详细介绍了该问题。高温超导

关于c# - FileHelpers在调用engine.ReadNext()方法和readign engine.LineNumber属性之间的线程锁定问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/2623896/

相关文章:

c# - 如何访问 web api Controller 中的 mvc Controller 以从 View 中获取 pdf

C 编程多线程段错误

sql-server-2008 - 防止多行出现竞争条件

c# - SqlDependency/Query 通知 - SQL Server 重启

c# - 如何处理它在任务中创建的对象?

c# - 更改 WCF Web Api HttpContext 响应

Java:同步套接字输入

java - while(true) 循环或标准程序循环的 java.util.Timer?

Gradle 没有考虑依赖锁定文件

MySQL:如何保持锁定并使其他线程等待尚未发生的插入