c# - 如何在.net中同时处理目录中的文件

标签 c# .net multithreading

我在一个目录中并行处理文件时遇到问题。我已经阅读了几个类似的问题和示例,但我似乎无法找到我的代码导致异常的原因。

我的目录被其他进程填充,并且在任何时候都会包含数千个文件。每个文件都必须被解析和验证,这需要时间文件系统/网络 io 等。我需要这个步骤并行完成,其余的必须串行完成。

这是我的代码:

public void run()
{
    XmlMessageFactory factory = new XmlMessageFactory();
    DirectoryInfo dir = new DirectoryInfo(m_sourceDir);
    Dictionary<string, int> retryList = new Dictionary<string, int>();
    ConcurrentQueue<Tuple<XmlMsg,FileInfo>> MsgQueue = new
                                      ConcurrentQueue<Tuple<XmlMsg,FileInfo>>();

    //start worker to handle messages
    System.Threading.ThreadPool.QueueUserWorkItem(o =>
        {
            XmlMsg msg;
            Tuple<XmlMsg, FileInfo> item;
            while (true)
            {
                if (!MsgQueue.TryDequeue(out item))
                {
                    System.Threading.Thread.Sleep(5000);
                    continue;
                }
                try
                {
                    msg = item.Item1;
                    /* processing on msg happens here */
                    handleMessageProcessed(item.Item2, ref retryList);
                }
                catch (Exception e)
                {
                    //if this method is called it gives the 
                    //exception below
                    handleMessageFailed(item.Item2, e.ToString()); 
                }
            }
        }
    );

    while (true)
    {
        try
        {
            FileInfo[] files = dir.GetFiles(m_fileTypes);
            Partitioner<FileInfo> partitioner = Partitioner.Create(files, true);
            Parallel.ForEach(partitioner, f => 
            {
                try
                {
                    XmlMsg msg = factory.getMessage(messageType);
                    try
                    {
                        msg.loadFile(f.FullName);
                        MsgQueue.Enqueue(new Tuple<XmlMsg, FileInfo>(msg, f));
                    }
                    catch (Exception e)
                    {
                        handleMessageFailed(f, e.ToString());
                    }
                }
            });
        }
    }
}

static void handleMessageFailed(FileInfo f, string message)
{
    //Erorr here: 
    f.MoveTo(m_failedDir + f.Name);
    //"The process cannot access the file because it is 
    //being used by another process."}  System.Exception {System.IO.IOException}
}

使用 ConcurrentQueue 怎么会同时尝试访问一个文件两次?

我目前有一个包含 5000 个文件的测试设置,每次运行至少会发生一次,并且每次都在不同的文件上发生。当我检查目录时,导致异常的源文件已经被处理并且在“已处理”目录中。

最佳答案

经过相当多的摸索之后,这个问题竟然简单得令人恼火!发生的事情是目录中文件的并行处理在文件的串行事件之前完成,因此循环重新启动并将一些文件重新添加到队列中已经存在的文件中。

为了完整起见,这里是修改后的代码部分:

while (true)
    {
        try
        {
            FileInfo[] files = dir.GetFiles(m_fileTypes);
            Partitioner<FileInfo> partitioner = Partitioner.Create(files, true);
            Parallel.ForEach(partitioner, f => 
            {
                try
                {
                    XmlMsg msg = factory.getMessage(messageType);
                    try
                    {
                        msg.loadFile(f.FullName);
                        MsgQueue.Enqueue(new Tuple<XmlMsg, FileInfo>(msg, f));
                    }
                    catch (Exception e)
                    {
                        handleMessageFailed(f, e.ToString());
                    }
                }
            });
            //Added check to wait for queue to deplete before 
            //re-scanning the directory
            while (MsgQueue.Count > 0)
            {
                System.Threading.Thread.Sleep(5000);
            }
        }
    }

关于c# - 如何在.net中同时处理目录中的文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21384561/

相关文章:

.net - 使用 Entity Framework 代码优先我是否需要拥有我的域贫血模型?

java - 信号量实现的面向生产者消费者的线程池

c# - 截图后 PhantomJS 巨大的内存消耗

c# - 如果我知道 API 在某个时候执行 I/O,我是否应该异步调用 brownfield API?

c# - 查看/收集 OnPrem Service Fabric ETW 事件

.net - .net 中带点的语义网址

c# - 异步运行 .net 事件?

c# - C# 中的队列和等待句柄

java - 具有多线程和公共(public)资源的批处理的建议架构

c# - Fluently Hibernate DB2SQL配置