c# - 如何聚合来自异步生产者的数据并将其写入文件?

标签 c# .net task-parallel-library async-await producer-consumer

我正在学习 C# 中的异步/等待模式。目前我正在尝试解决这样的问题:

  • 有一个生产者(硬件设备)每秒生成 1000 个数据包。我需要将这些数据记录到一个文件中。

  • 设备只有一个ReadAsync() 方法来一次报告一个数据包。

  • 我需要缓冲数据包并按照它们生成的顺序将它们写入文件,每秒一次。

  • 如果在下一批数据包准备好写入时写入过程没有及时完成,则写入操作应该失败。

到目前为止,我已经写了类似下面的内容。它有效,但我不确定这是否是解决问题的最佳方法。有什么意见或建议吗?解决此类生产者/消费者问题(消费者需要聚合从生产者那里收到的数据)的最佳做法是什么?

static async Task TestLogger(Device device, int seconds)
{
    const int bufLength = 1000;
    bool firstIteration = true;
    Task writerTask = null;

    using (var writer = new StreamWriter("test.log")))
    {
        do
        {
            var buffer = new byte[bufLength][];

            for (int i = 0; i < bufLength; i++)
            {
                buffer[i] = await device.ReadAsync();
            }

            if (!firstIteration)
            {
                if (!writerTask.IsCompleted)
                    throw new Exception("Write Time Out!");
            }

            writerTask = Task.Run(() =>
                {
                    foreach (var b in buffer)
                        writer.WriteLine(ToHexString(b));
                });

            firstIteration = false;
        } while (--seconds > 0);
    }
}

最佳答案

您可以使用以下想法,前提是刷新的标准是数据包的数量(最多 1000 个)。我没有测试它。它利用了 Stephen Cleary 的 AsyncProducerConsumerQueue<T> 特色 this question .

AsyncProducerConsumerQueue<byte[]> _queue;
Stream _stream;

// producer
async Task ReceiveAsync(CancellationToken token)
{
    while (true)
    {
       var list = new List<byte>();
       while (true)
       {
           token.ThrowIfCancellationRequested(token);
           var packet = await _device.ReadAsync(token);
           list.Add(packet);
           if (list.Count == 1000)
               break;
       }
       // push next batch
       await _queue.EnqueueAsync(list.ToArray(), token);
    }
}

// consumer
async Task LogAsync(CancellationToken token)
{
    Task previousFlush = Task.FromResult(0); 
    CancellationTokenSource cts = null;
    while (true)
    {
       token.ThrowIfCancellationRequested(token);
       // get next batch
       var nextBatch = await _queue.DequeueAsync(token);
       if (!previousFlush.IsCompleted)
       {
           cts.Cancel(); // cancel the previous flush if not ready
           throw new Exception("failed to flush on time.");
       }
       await previousFlush; // it's completed, observe for any errors
       // start flushing
       cts = CancellationTokenSource.CreateLinkedTokenSource(token);
       previousFlush = _stream.WriteAsync(nextBatch, 0, nextBatch.Count, cts.Token);
    }
}

如果您不想让记录器失败,而是更愿意取消刷新并继续下一批,您可以通过对此代码进行最小的更改来实现。

回应@l3arnon 的评论:

  1. A packet is not a byte, it's byte[]. 2. You haven't used the OP's ToHexString. 3. AsyncProducerConsumerQueue is much less robust and tested than .Net's TPL Dataflow. 4. You await previousFlush for errors just after you throw an exception which makes that line redundant. etc. In short: I think the possible added value doesn't justify this very complicated solution.
  1. “一个数据包不是一个字节,它是字节[]”——一个数据包一个字节,这从OP的代码中显而易见:buffer[i] = await device.ReadAsync() .那么,一批数据包就是byte[] .
  2. “您还没有使用 OP 的 ToHexString。” - 目标是展示如何使用 Stream.WriteAsync native 接受取消 token ,而不是 WriteLineAsync不允许取消。使用 ToHexString 很简单与 Stream.WriteAsync并且仍然利用取消支持:

    var hexBytes = Encoding.ASCII.GetBytes(ToHexString(nextBatch) + 
        Environment.NewLine);
    _stream.WriteAsync(hexBytes, 0, hexBytes.Length, token);
    
  3. “AsyncProducerConsumerQueue 的健壮性和测试远不如 .Net 的 TPL 数据流”- 我认为这不是一个确定的事实。但是,如果 OP 关心它,他可以使用常规 BlockingCollection ,它不会阻塞生产者线程。在等待下一批时阻塞消费者线程是可以的,因为写入是并行完成的。与此相反,您的 TPL 数据流版本带有一个 冗余 CPU 和锁定密集型操作:使用 logAction.Post(packet) 将数据从生产者管道移动到写入者管道, 逐字节。我的代码不这样做。

  4. “您在抛出异常后等待 previousFlush 以查找错误,这使得该行变得多余。” - 这条线不是多余的。也许,你错过了这一点:previousFlush.IsCompleted可以是true什么时候previousFlush.IsFaultedpreviousFlush.IsCancelled也是true .所以,await previousFlush与观察已完成任务的任何错误(例如,写入失败)相关,否则这些错误将丢失。

关于c# - 如何聚合来自异步生产者的数据并将其写入文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24055900/

相关文章:

c# - 无法在 C# 中使用线程更新进度条

c# - 如何清除我使用过一次的 WCF HTTP 客户端的 PooledBufferManager?

c# - 异步等待 block 主 UI

c# - 如何使数组协方差在 F# 中工作

c# - 等待的任务以取消状态结束不会抛出

.net - 如何在完成后跟进 Parallel.ForEach?

c# - 在 C# wpf 应用程序中处理线程

c# - 使用响应式(Reactive)扩展按计划运行任务

c# - 无法加载文件或程序集 'Microsoft.Web.Iis.Rewrite.Providers, Version=7.1.761.0, Culture=neutral, PublicKeyToken=0545b0627da60a5f'

c# - 使用异步 lambda 的并行 foreach