c# - 实现异步 "read all currently available data from stream"操作

标签 c# asynchronous console stream filestream

我最近提供了这个问题的答案:C# - Realtime console output redirection .

正如经常发生的那样,解释东西(这里的“东西”是我解决类似问题的方式)可以让您更好地理解和/或,就像这里的情况一样,“哎呀”时刻。我意识到我的解决方案在实现时有一个错误。该错误几乎没有实际意义,但对我作为开发人员来说却非常重要:知道我的代码有可能爆炸,我不能轻松。

这个问题的目的是消除错误。我为冗长的介绍道歉,所以让我们弄脏。

我想构建一个允许我从控制台标准输出接收输入的类 Stream .控制台输出流的类型为 FileStream ;如果需要,实现可以转换为。还有一个相关的StreamReader已经存在以利用。

我只需要在这个类中实现一件事来实现我想要的功能:异步“读取此时可用的所有数据”操作。读取到流的末尾是不可行的,因为除非进程关闭控制台输出句柄,否则流不会结束,并且它不会这样做,因为它是交互式的并且在继续之前需要输入。

我将使用这个假设的异步操作来实现基于事件的通知,这对我的调用者来说会更方便。

该类的公共(public)接口(interface)是这样的:

public class ConsoleAutomator {
    public event EventHandler<ConsoleOutputReadEventArgs> StandardOutputRead;

    public void StartSendingEvents();
    public void StopSendingEvents();
}
StartSendingEventsStopSendingEvents做他们宣传的事情;出于本次讨论的目的,我们可以假设事件总是在不失一般性的情况下被发送。

该类在内部使用这两个字段:
    protected readonly StringBuilder inputAccumulator = new StringBuilder();

    protected readonly byte[] buffer = new byte[256];

该类的功能在以下方法中实现。让球滚动:
    public void StartSendingEvents();
    {
        this.stopAutomation = false;
        this.BeginReadAsync();
    }

Stream 中读取数据没有阻塞,也不需要回车符,BeginRead叫做:
    protected void BeginReadAsync()
    {
        if (!this.stopAutomation) {
            this.StandardOutput.BaseStream.BeginRead(
                this.buffer, 0, this.buffer.Length, this.ReadHappened, null);
        }
    }

挑战部分:
BeginRead需要使用缓冲区。这意味着从流中读取时,可供读取的字节(“传入块”)可能大于缓冲区。
请记住,此处的目标是读取所有块并为每个块仅调用一次事件订阅者。

为此,如果缓冲区满了 EndRead ,我们不会立即将其内容发送给订阅者,而是将它们附加到 StringBuilder . StringBuilder的内容仅在没有更多内容可从流中读取时才发送回。
    private void ReadHappened(IAsyncResult asyncResult)
    {
        var bytesRead = this.StandardOutput.BaseStream.EndRead(asyncResult);
        if (bytesRead == 0) {
            this.OnAutomationStopped();
            return;
        }

        var input = this.StandardOutput.CurrentEncoding.GetString(
            this.buffer, 0, bytesRead);
        this.inputAccumulator.Append(input);

        if (bytesRead < this.buffer.Length) {
            this.OnInputRead(); // only send back if we 're sure we got it all
        }

        this.BeginReadAsync(); // continue "looping" with BeginRead
    }

在任何不足以填充缓冲区的读取之后(在这种情况下,我们知道在上次读取操作期间没有更多数据要读取),所有累积的数据都发送给订阅者:
    private void OnInputRead()
    {
        var handler = this.StandardOutputRead;
        if (handler == null) {
            return;
        }

        handler(this, 
                new ConsoleOutputReadEventArgs(this.inputAccumulator.ToString()));
        this.inputAccumulator.Clear();
    }

(我知道只要没有订阅者,数据就会永远累积。这是一个深思熟虑的决定)。



这个方案几乎完美地工作:
  • 不产生任何线程的异步功能
  • 非常方便的调用代码(只需订阅一个事件)
  • 每次可读取数据时不会超过一个事件
  • 几乎与缓冲区大小无关

  • 坏了

    最后一个几乎是一个非常大的。考虑当传入的块的长度正好等于缓冲区的大小时会发生什么。该块将被读取和缓冲,但不会触发事件。紧随其后的是 BeginRead期望找到更多属于当前块的数据,以便将其全部发回,但是......流中将没有更多数据。

    实际上,只要将数据以长度恰好等于缓冲区大小的块的形式放入流中,数据就会被缓冲并且永远不会触发事件。

    这种情况在实践中不太可能发生,特别是因为我们可以为缓冲区大小选择任意数字,但问题就在这里。

    解决方案?

    不幸的是,在检查了 FileStream 上的可用方法后和 StreamReader ,我找不到任何可以让我窥视流同时还允许在其上使用异步方法的东西。

    一种“解决方案”是让线程等待 ManualResetEvent在检测到“缓冲区已填充”条件后。如果事件没有在短时间内(通过异步回调)发出信号,那么来自流的更多数据将不会出现,到目前为止积累的数据应该发送给订阅者。然而,这引入了对另一个线程的需求,需要线程同步,而且是不雅的。

    BeginRead 指定超时也足够了(不时调用我的代码,这样我就可以检查是否有要发回的数据;大多数时候不会有任何事情要做,所以我预计性能影响可以忽略不计)。但看起来 FileStream 不支持超时.

    因为我认为带有超时的异步调用是裸 Win32 中的一个选项,另一种方法可能是 PInvoke 解决问题。但这也是不可取的,因为它会引入复杂性并且只会给代码带来麻烦。

    有没有一种优雅的方法来解决这个问题?

    感谢您有足够的耐心阅读所有这些。

    更新:

    我在最初的文章中肯定没有很好地传达场景。从那以后,我对这篇文章进行了相当多的修改,但要特别确定:

    问题是关于如何实现异步“读取当前可用的所有数据”操作。

    我向那些花时间阅读和回答的人道歉,而我没有明确表达我的意图。

    最佳答案

    理论上,我同意杰森的观点;在大块数据可被您的缓冲区整除的情况下,您的实现比具有逻辑漏洞的问题更大。我看到的最大问题是您的读者必须对文件类型有足够的了解,才能知道如何将数据分成订阅者知道如何处理的“块”。

    流没有关于它们接收或发送什么的固有知识;只有他们传输数据的机制。 NetworkStream 可能正在发送 HTML 或 ZIP 文件; FileStream 可能正在读取文本文件或 MP3。具有此知识的是阅读器(XmlReader、TextReader、Image.FromStream() 等)。因此,您的异步阅读器必须至少了解有关数据的一些信息,但最好不要将这些知识进行硬编码。

    为了处理“流”数据,增量发送必须单独有用;您必须充分了解您所获得的信息,即您获得的是可单独处理的“块”。我的建议是以封装的方式将这些信息提供给您的异步阅读器,方法是让您的订阅者告诉您,或者提供一些与监听器分开的特定格式的“chunkifier”(因为此阅读器正在收听控制台输出,以及所有听众应该以同样的方式对待它,这第二个计划可能会更好)。

    一个逻辑实现:

    public class MyStreamManager {
        public delegate bool ValidChunkTester(StringBuilder builder);
    
        private readonly List<ValidChunkTester> validators = new List<ValidChunkTester>();
        public event ValidChunkTester IsValidChunk
        { add{validators.Add(value);} remove {validators.Remove(value);}}
    
        public event EventHandler<ConsoleOutputReadEventArgs> StandardOutputRead;
    
    
        public void StartSendingEvents();
        public void StopSendingEvents();
    }
    
    ...
    
    private void ReadHappened(IAsyncResult asyncResult)
    {
        var bytesRead = this.StandardOutput.BaseStream.EndRead(asyncResult);
        if (bytesRead == 0) {
            this.OnAutomationStopped();
            return;
        }
    
        var input = this.StandardOutput.CurrentEncoding.GetString(
            this.buffer, 0, bytesRead);
        this.inputAccumulator.Append(input);
    
        if (validators.Any() && StandardOutputRead !-= null 
                && validators.Aggregate(true, (valid, validator)=>valid && validator(inputAccumulator))) {
            this.OnInputRead(); // send when all listeners can work with the buffer contents
        }
    
        this.BeginReadAsync(); // continue "looping" with BeginRead
    }
    
    ...
    

    该模型要求订阅者不得修改 StringBuilder;如果您愿意,您可以提供一些不可变的东西供他们检查。一个示例监听器可能是:
    public bool IsACompleteLine(StringBuilder builder)
    {
        return builder.Contains(Environment.NewLine);
    }
    

    或者:
    public bool Contains256Bytes(StringBuilder builder)
    {
        return builder.Length >= 256;
    }
    

    ......你明白了。确定要释放给监听器的当前缓冲区的值(value)的事件在概念上与监听器本身是分开的,但不必具体如此,因此它将支持单个特定于输出的测试或多个基于监听器的测试。

    关于c# - 实现异步 "read all currently available data from stream"操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4504868/

    相关文章:

    ruby-on-rails - 在 Rails 控制台中使用 fixture finder helpers

    c# - 为什么推荐继承Exception而不是ApplicationException

    c# - 无论如何要避免将 PostSharp 集成到 Visual Studio 中?

    c# - LINQ-to-SQL 查询何时执行?

    javascript - 从数组中顺序执行一堆 WinJS promise

    c# - 控制台等待鼠标左键按下

    c# - 是否可以为多种文化创建一个 MSI?

    c# - 遍历任务列表 C#

    r - 获取 R 中异步 future 的子进程的 PID

    ruby-on-rails-3 - 如何从 rails 控制台运行初始化程序?