我在一个目录中并行处理文件时遇到问题。我已经阅读了几个类似的问题和示例,但我似乎无法找到我的代码导致异常的原因。
我的目录被其他进程填充,并且在任何时候都会包含数千个文件。每个文件都必须被解析和验证,这需要时间文件系统/网络 io 等。我需要这个步骤并行完成,其余的必须串行完成。
这是我的代码:
public void run()
{
XmlMessageFactory factory = new XmlMessageFactory();
DirectoryInfo dir = new DirectoryInfo(m_sourceDir);
Dictionary<string, int> retryList = new Dictionary<string, int>();
ConcurrentQueue<Tuple<XmlMsg,FileInfo>> MsgQueue = new
ConcurrentQueue<Tuple<XmlMsg,FileInfo>>();
//start worker to handle messages
System.Threading.ThreadPool.QueueUserWorkItem(o =>
{
XmlMsg msg;
Tuple<XmlMsg, FileInfo> item;
while (true)
{
if (!MsgQueue.TryDequeue(out item))
{
System.Threading.Thread.Sleep(5000);
continue;
}
try
{
msg = item.Item1;
/* processing on msg happens here */
handleMessageProcessed(item.Item2, ref retryList);
}
catch (Exception e)
{
//if this method is called it gives the
//exception below
handleMessageFailed(item.Item2, e.ToString());
}
}
}
);
while (true)
{
try
{
FileInfo[] files = dir.GetFiles(m_fileTypes);
Partitioner<FileInfo> partitioner = Partitioner.Create(files, true);
Parallel.ForEach(partitioner, f =>
{
try
{
XmlMsg msg = factory.getMessage(messageType);
try
{
msg.loadFile(f.FullName);
MsgQueue.Enqueue(new Tuple<XmlMsg, FileInfo>(msg, f));
}
catch (Exception e)
{
handleMessageFailed(f, e.ToString());
}
}
});
}
}
}
static void handleMessageFailed(FileInfo f, string message)
{
//Erorr here:
f.MoveTo(m_failedDir + f.Name);
//"The process cannot access the file because it is
//being used by another process."} System.Exception {System.IO.IOException}
}
使用 ConcurrentQueue 怎么会同时尝试访问一个文件两次?
我目前有一个包含 5000 个文件的测试设置,每次运行至少会发生一次,并且每次都在不同的文件上发生。当我检查目录时,导致异常的源文件已经被处理并且在“已处理”目录中。
最佳答案
经过相当多的摸索之后,这个问题竟然简单得令人恼火!发生的事情是目录中文件的并行处理在文件的串行事件之前完成,因此循环重新启动并将一些文件重新添加到队列中已经存在的文件中。
为了完整起见,这里是修改后的代码部分:
while (true)
{
try
{
FileInfo[] files = dir.GetFiles(m_fileTypes);
Partitioner<FileInfo> partitioner = Partitioner.Create(files, true);
Parallel.ForEach(partitioner, f =>
{
try
{
XmlMsg msg = factory.getMessage(messageType);
try
{
msg.loadFile(f.FullName);
MsgQueue.Enqueue(new Tuple<XmlMsg, FileInfo>(msg, f));
}
catch (Exception e)
{
handleMessageFailed(f, e.ToString());
}
}
});
//Added check to wait for queue to deplete before
//re-scanning the directory
while (MsgQueue.Count > 0)
{
System.Threading.Thread.Sleep(5000);
}
}
}
关于c# - 如何在.net中同时处理目录中的文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21384561/