c# - 如何检测未知并发任务的完成推送和拉取并发队列 <T>

标签 c# .net concurrency task task-parallel-library

几天前,我尝试在我的磁盘上执行快速搜索,做了一些事情,比如属性、扩展、在文件中执行更改等......

我们的想法是使其具有非常少的限制/锁定,以避免大文件或内部包含大量文件的目录等的“延迟”...... 我知道这离“最佳实践”还很远,因为我没有使用“MaxDegreeOfParallelism”或带有“while(true)”的拉循环

尽管如此,代码运行得非常快,因为我们有支持它的架构。

如果有人想检查发生了什么,我尝试将代码转移到虚拟控制台项目。

class Program
{
    static ConcurrentQueue<String> dirToCheck;
    static ConcurrentQueue<String> fileToCheck;
    static int fileCount; //

    static void Main(string[] args)
    {
        Initialize();

        Task.Factory.StartNew(() => ScanDirectories(), TaskCreationOptions.LongRunning);
        Task.Factory.StartNew(() => ScanFiles(), TaskCreationOptions.LongRunning);

        Console.ReadLine();
    }

    static void Initialize()
    {
        //Instantiate caches
        dirToCheck = new ConcurrentQueue<string>();
        fileToCheck = new ConcurrentQueue<string>();

        //Enqueue Directory to Scan here
        //Avoid to Enqueue Nested/Sub directories, else they are going to be dcan at least twice
        dirToCheck.Enqueue(@"C:\");

        //Initialize counters
        fileCount = 0;
    }

    static void ScanDirectories()
    {
        String dirToScan = null;

        while (true)
        {
            if (dirToCheck.TryDequeue(out dirToScan))
            {
                ExtractDirectories(dirToScan);
                ExtractFiles(dirToScan);
            }

            //Just here as a visual tracker to have some kind an idea about what's going on and where's the load
            Console.WriteLine(dirToCheck.Count + "\t\t" + fileToCheck.Count + "\t\t" + fileCount);
        }
    }

    static void ScanFiles()
    {
        while (true)
        {
            String fileToScan = null;
            if (fileToCheck.TryDequeue(out fileToScan))
            {
                CheckFileAsync(fileToScan);
            }
        }
    }

    private static Task ExtractDirectories(string dirToScan)
    {
        Task worker = Task.Factory.StartNew(() =>
        {
            try
            {
                Parallel.ForEach<String>(Directory.EnumerateDirectories(dirToScan), (dirPath) =>
                {
                    dirToCheck.Enqueue(dirPath);
                });

            }
            catch (UnauthorizedAccessException) { }
        }, TaskCreationOptions.AttachedToParent);

        return worker;
    }

    private static Task ExtractFiles(string dirToScan)
    {
        Task worker = Task.Factory.StartNew(() =>
        {
            try
            {
                Parallel.ForEach<String>(Directory.EnumerateFiles(dirToScan), (filePath) =>
                {
                    fileToCheck.Enqueue(filePath);
                });
            }
            catch (UnauthorizedAccessException) { }
        }, TaskCreationOptions.AttachedToParent);

        return worker;
    }

    static Task CheckFileAsync(String filePath)
    {
        Task worker = Task.Factory.StartNew(() =>
        {
            //Add statement to play along with the file here
            Interlocked.Increment(ref fileCount);


            //WARNING !!! If your file fullname is too long this code may not be executed or may just crash
            //I just put a simple check 'cause i found 2 or 3 different error message between the framework & msdn documentation
            //"Full paths must not exceed 260 characters to maintain compatibility with Windows operating systems. For more information about this restriction, see the entry Long Paths in .NET in the BCL Team blog"
            if (filePath.Length > 260)
                return;
            FileInfo fi = new FileInfo(filePath);

            //Add statement here to use FileInfo

        }, TaskCreationOptions.AttachedToParent);

        return worker;
    }
}

问题: 我如何才能检测到我已完成 ScanDirectory? 完成后,我可以设法将一个空字符串或其他任何内容排入文件队列,以退出它。 我知道如果我使用“AttachedToParent”我可以在父任务上有一个完成状态,然后例如做一些像“ContinueWith(()=> {/SomeCode to notice the end/} )" 但是父任务仍然在进行拉取并陷入一种无限循环,每个子语句都开始新的任务。

另一方面,我不能简单地在每个队列中测试“Count”,因为我可能已经刷新了文件列表和目录列表,但可能还有另一个任务将调用“EnumerateDirectory()”。

我试图找到某种“响应式(Reactive)”解决方案并避免在循环内出现一些“if()”,因为它是一个带有 AsyncCall 的简单 while(true){},因此在 80% 的时间内都将被检查为空。

PS:我知道我可以使用 TPL 数据流,我不是因为我被困在 .net 4.0 上,无论如何,在没有数据流的 .net 4.5 中,因为 TPL 几乎没有改进,我仍然很好奇关于它

最佳答案

而不是 ConcurrentQueue<T> , 你可以使用 BlockingCollection<T> .

BlockingCollection<T>专为此类生产者/消费者场景而设计,并提供 CompleteAdding方法,以便生产者可以通知消费者它已完成添加工作。

关于c# - 如何检测未知并发任务的完成推送和拉取并发队列 <T>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12925053/

相关文章:

.net - 如何在 C# 中使用操作动词进行命令行解析?

concurrency - 分布式系统上的邮箱处理器

C 并发程序输出取决于输出到标准输出还是文件

c# - 在 wpf 数据网格组合框列中显示一些静态值

c# - 每当属性的值发生变化时引发事件?

c# - 如何使我的 appDomain 生命周期更长?

java - 从 hashmap 更改对象会影响多线程中的 wait() 方法吗?

c# - 外部jar文件包含在monodroid的java绑定(bind)库中吗?

c# - 从 DynamicResource 为 BorderBrush 设置动画会使用该画笔为所有内容设置动画

c# - 使用最小起订量为复杂操作创建模拟引用和设置方法