c# - 线程安全缓冲区包装流

标签 c# .net multithreading concurrency buffer

我在 TcpClient 之上使用 SslStream。不幸的是,“SslStream”不支持同时从多个线程写入或读取。这就是为什么我围绕它编写了自己的包装器:

private ConcurrentQueue<byte> sendQueue;

private volatile bool oSending;

private readonly object writeLock;


public async void Write(byte[] buffer, int offset, int count)
{
  if (oSending)
  {
    lock (writeLock)
    {
      foreach (var b in buffer)
      {
        sendQueue.Enqueue(b);
      }
    }
  }
  else
  {
    oSending = true;
    await stream.WriteAsync(buffer, offset, count);
    oSending = false;

    lock (writeLock)
    {
      if (sendQueue.Count > 0)
      {
        Write(sendQueue.ToArray(), 0, sendQueue.Count);
        sendQueue = new ConcurrentQueue<byte>();
      }
    }
  }
}

其背后的意图如下:

  1. 如果流空闲,则立即写入流。
  2. 如果流繁忙,则写入缓冲区。
  3. 如果流发送返回,则检查队列中是否有数据,并递归发送。

到目前为止,我已经尝试了几种解决方案,但似乎每次都发送太多数据。

P.S.:我知道按字节填充队列并不好,但这只是变得又快又脏。

更新:我根据 Dirk 的评论添加了队列的删除。

最佳答案

更新

使用TPL Dataflow :

using System.Threading.Tasks.Dataflow;

public class DataflowStreamWriter
{
    private readonly MemoryStream _stream = new MemoryStream();
    private readonly ActionBlock<byte[]> _block;

    public DataflowStreamWriter()
    {
        _block = new ActionBlock<byte[]>(
                        bytes => _stream.Write(bytes, 0, bytes.Length));
    }

    public void Write(byte[] data)
    {
        _block.Post(data);
    }
}

这是一种更好的生产者-消费者方法。

每当有人向您的 ConcurrentStreamWriter 写入数据时例如,该数据将被添加到缓冲区中。此方法是线程安全的,多个线程可以同时写入数据。这些是您的制作人

然后,您就有一个消费者 - 消耗缓冲区中的数据并将其写入流。

一个BlockingCollection<T>用于生产者和消费者之间的通信。这样,如果没有人生产,消费者就会闲置。每当生产者启动并向缓冲区写入内容时,消费者就会醒来。

消费者是延迟初始化的 - 当且仅当某些数据首次可用时才会创建它。

public class ConcurrentStreamWriter : IDisposable
{
    private readonly MemoryStream _stream = new MemoryStream();
    private readonly BlockingCollection<byte> _buffer = new BlockingCollection<byte>(new ConcurrentQueue<byte>());

    private readonly object _writeBufferLock = new object();
    private Task _flusher;
    private volatile bool _disposed;

    private void FlushBuffer()
    {
        //keep writing to the stream, and block when the buffer is empty
        while (!_disposed)
            _stream.WriteByte(_buffer.Take());

        //when this instance has been disposed, flush any residue left in the ConcurrentStreamWriter and exit
        byte b;
        while (_buffer.TryTake(out b))
            _stream.WriteByte(b);
    }

    public void Write(byte[] data)
    {
        if (_disposed)
            throw new ObjectDisposedException("ConcurrentStreamWriter");

        lock (_writeBufferLock)
            foreach (var b in data)
                _buffer.Add(b);

        InitFlusher();
    }

    public void InitFlusher()
    {
        //safely create a new flusher task if one hasn't been created yet
        if (_flusher == null)
        {
            Task newFlusher = new Task(FlushBuffer);
            if (Interlocked.CompareExchange(ref _flusher, newFlusher, null) == null)
                newFlusher.Start();
        }
    }

    public void Dispose()
    {
        _disposed = true;
        if (_flusher != null)
            _flusher.Wait();

        _buffer.Dispose();
    }
}

关于c# - 线程安全缓冲区包装流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22225410/

相关文章:

c# - 为什么对象和集合初始值设定项的组合使用 Add 方法?

c# - 模拟 IOptionsMonitor

c# - 在 XAML ResourceDictionary 中绑定(bind)

c# - 是否可以使用 WIX 创建多语言安装程序?

c# - 控制台输出到文本框

c# - 绑定(bind)命令的正确方法

c# - 当缺少System指令时,如何解析类型别名?

c# - 创建线程前台 VS Thread.Join()

Java线程间共享引用

c# - 更好的技术 : Reading Data in a Thread