c# - 涉及递归的 TPL 数据流未完成

标签 c# async-await task-parallel-library tpl-dataflow

在一个测试 WPF 项目中,我尝试使用 TPL 数据流枚举给定父目录的所有子目录,并创建具有特定文件扩展名的文件列表,例如“.xlsx”。我使用 2 个 block ,第一个 dirToFilesBlock,最后一个 fileActionBlock。

为了创建遍历所有子目录的递归效果,第一个 block 有一个返回自身的链接,并通过链接谓词测试来查看输出项是否是目录。这是我在一本关于异步编程的书中找到的方法。第二个链接指向 fileActionBlock,然后根据链接谓词测试将文件添加到列表中,以查看文件是否具有正确的扩展名。

我遇到的问题是在使用 btnStart_Click 启动之后,它永远不会完成。也就是说,我们永远不会到达事件处理程序中的等待下方以显示“已完成”消息。我知道我可能需要调用 dirToFilesBlock.Complete(),但我不知道这应该在代码中的哪个位置以及在什么条件下?我无法在最初的帖子之后调用它,因为它会停止提供子目录的反向链接。我尝试过使用 InputCount 和 OutputCount 属性进行操作,但效果不佳。如果可能的话,我希望保持数据流的结构不变,因为这意味着我还可以通过返回的链接使用要探索的每个新目录来更新 UI,以便为用户提供一些进度反馈。

我对 TPL 数据流非常陌生,非常感谢任何帮助。

这是代码隐藏文件中的代码:

public partial class MainWindow : Window
{
    TransformManyBlock<string, string> dirToFilesBlock;
    ActionBlock<string> fileActionBlock;
    ObservableCollection<string> files;
    CancellationTokenSource cts;
    CancellationToken ct;
    public MainWindow()
    {
        InitializeComponent();

        files = new ObservableCollection<string>();

        lst.DataContext = files;

        cts = new CancellationTokenSource();
        ct = cts.Token;
    }

    private Task Start(string path)
    {
        var uiScheduler = TaskScheduler.FromCurrentSynchronizationContext();

        dirToFilesBlock = new TransformManyBlock<string, string>((Func<string, IEnumerable<string>>)(GetFileSystemItems), new ExecutionDataflowBlockOptions() { CancellationToken = ct });
        fileActionBlock = new ActionBlock<string>((Action<string>)ProcessFile, new ExecutionDataflowBlockOptions() {CancellationToken = ct, TaskScheduler = uiScheduler});

        // Order of LinkTo's important here!
        dirToFilesBlock.LinkTo(dirToFilesBlock, new DataflowLinkOptions() { PropagateCompletion = true }, IsDirectory);
        dirToFilesBlock.LinkTo(fileActionBlock, new DataflowLinkOptions() { PropagateCompletion = true }, IsRequiredDocType);

        // Kick off the recursion.
        dirToFilesBlock.Post(path);

        return Task.WhenAll(dirToFilesBlock.Completion, fileActionBlock.Completion);
    }

    private bool IsDirectory(string path)
    {

        return Directory.Exists(path);
    }


    private bool IsRequiredDocType(string fileName)
    {
        return System.IO.Path.GetExtension(fileName) == ".xlsx";
    }

    private IEnumerable<string> GetFilesInDirectory(string path)
    {
        // Check for cancellation with each new dir.
        ct.ThrowIfCancellationRequested();

        // Check in case of Dir access problems
        try
        {
            return Directory.EnumerateFileSystemEntries(path);
        }
        catch (Exception)
        {
            return Enumerable.Empty<string>();
        }
    }

    private IEnumerable<string> GetFileSystemItems(string dir)
    {
        return GetFilesInDirectory(dir);
    }

    private void ProcessFile(string fileName)
    {
        ct.ThrowIfCancellationRequested();

       files.Add(fileName);
    }

    private async void btnStart_Click(object sender, RoutedEventArgs e)
    {
        try
        {
            await Start(@"C:\");
            // Never gets here!!!
            MessageBox.Show("Completed");

        }
        catch (OperationCanceledException)
        {
            MessageBox.Show("Cancelled");

        }
        catch (Exception)
        {
            MessageBox.Show("Unknown err");
        }
        finally
        {
        }
    }

    private void btnCancel_Click(object sender, RoutedEventArgs e)
    {
        cts.Cancel();
    }
}

}

最佳答案

尽管这是一个老问题,但处理数据流循环中的完成仍然可能是一个问题。

在您的情况下,您可以让 TransfomBlock 保留仍在飞行中的元素的计数。这表明该 block 正忙于处理任意数量的项目。然后,只有当 block 不忙且两个缓冲区都为空时,您才会调用 Complete()。您可以在我写的文章 Finding Completion in a Complex Flow: Feedback Loops 中找到有关处理完成的更多信息。

public partial class MainWindow : Window {

        TransformManyBlock<string, string> dirToFilesBlock;
        ActionBlock<string> fileActionBlock;
        ObservableCollection<string> files;
        CancellationTokenSource cts;
        CancellationToken ct;
        public MainWindow() {
            InitializeComponent();

            files = new ObservableCollection<string>();

            lst.DataContext = files;

            cts = new CancellationTokenSource();
            ct = cts.Token;
        }

        private async Task Start(string path) {
            var uiScheduler = TaskScheduler.FromCurrentSynchronizationContext();

            dirToFilesBlock = new TransformManyBlock<string, string>((Func<string, IEnumerable<string>>)(GetFileSystemItems), new ExecutionDataflowBlockOptions() { CancellationToken = ct });
            fileActionBlock = new ActionBlock<string>((Action<string>)ProcessFile, new ExecutionDataflowBlockOptions() { CancellationToken = ct, TaskScheduler = uiScheduler });

            // Order of LinkTo's important here!
            dirToFilesBlock.LinkTo(dirToFilesBlock, new DataflowLinkOptions() { PropagateCompletion = true }, IsDirectory);
            dirToFilesBlock.LinkTo(fileActionBlock, new DataflowLinkOptions() { PropagateCompletion = true }, IsRequiredDocType);

            // Kick off the recursion.
            dirToFilesBlock.Post(path);

            await ProcessingIsComplete();
            dirToFilesBlock.Complete();
            await Task.WhenAll(dirToFilesBlock.Completion, fileActionBlock.Completion);
        }

        private async Task ProcessingIsComplete() {
            while (!ct.IsCancellationRequested && DirectoryToFilesBlockIsIdle()) {
                await Task.Delay(500);
            }
        }

        private bool DirectoryToFilesBlockIsIdle() {
            return dirToFilesBlock.InputCount == 0 &&
                dirToFilesBlock.OutputCount == 0 &&
                directoriesBeingProcessed <= 0;
        }

        private bool IsDirectory(string path) {
            return Directory.Exists(path);
        }


        private bool IsRequiredDocType(string fileName) {
            return System.IO.Path.GetExtension(fileName) == ".xlsx";
        }

        private int directoriesBeingProcessed = 0;

        private IEnumerable<string> GetFilesInDirectory(string path) {
            Interlocked.Increment(ref directoriesBeingProcessed)
            // Check for cancellation with each new dir.
            ct.ThrowIfCancellationRequested();

            // Check in case of Dir access problems
            try {
                return Directory.EnumerateFileSystemEntries(path);
            } catch (Exception) {
                return Enumerable.Empty<string>();
            } finally {
                Interlocked.Decrement(ref directoriesBeingProcessed);
            }
        }

        private IEnumerable<string> GetFileSystemItems(string dir) {
            return GetFilesInDirectory(dir);
        }

        private void ProcessFile(string fileName) {
            ct.ThrowIfCancellationRequested();

            files.Add(fileName);
        }

        private async void btnStart_Click(object sender, RoutedEventArgs e) {
            try {
                await Start(@"C:\");
                // Never gets here!!!
                MessageBox.Show("Completed");

            } catch (OperationCanceledException) {
                MessageBox.Show("Cancelled");

            } catch (Exception) {
                MessageBox.Show("Unknown err");
            } finally {
            }
        }

        private void btnCancel_Click(object sender, RoutedEventArgs e) {
            cts.Cancel();
        }
    }

关于c# - 涉及递归的 TPL 数据流未完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37054930/

相关文章:

c# - 如何将 Panel 的内容复制到 richTextBox?

Flutter Firestore getDocuments() 不适用于 initState

javascript - 等待后在 map 函数下循环序列中断

task-parallel-library - 为什么 TaskFactory.StartNew 与 Task.ContinueWith 结合使用?

c# - 多对多 TPL 数据流不处理所有输入

c# - 取消自动完成字段的任务不会取消所有以前的任务

c# - 我可以自动填充 MongoDB 中的 LastUpdated 字段吗(使用 C# 驱动程序)

c# - 如何在 Roslyn 中用 var 替换字符串变量?

c# - 是否可以从Button调用Command?

python-3.x - 通过对 Python 3.6 Flask 应用程序中的路由的 HTTP 请求启动非阻塞异步函数调用