我在 TcpClient
之上使用 SslStream
。不幸的是,“SslStream”不支持同时从多个线程写入或读取。这就是为什么我围绕它编写了自己的包装器:
private ConcurrentQueue<byte> sendQueue;
private volatile bool oSending;
private readonly object writeLock;
public async void Write(byte[] buffer, int offset, int count)
{
if (oSending)
{
lock (writeLock)
{
foreach (var b in buffer)
{
sendQueue.Enqueue(b);
}
}
}
else
{
oSending = true;
await stream.WriteAsync(buffer, offset, count);
oSending = false;
lock (writeLock)
{
if (sendQueue.Count > 0)
{
Write(sendQueue.ToArray(), 0, sendQueue.Count);
sendQueue = new ConcurrentQueue<byte>();
}
}
}
}
其背后的意图如下:
- 如果流空闲,则立即写入流。
- 如果流繁忙,则写入缓冲区。
- 如果流发送返回,则检查队列中是否有数据,并递归发送。
到目前为止,我已经尝试了几种解决方案,但似乎每次都发送太多数据。
P.S.:我知道按字节填充队列并不好,但这只是变得又快又脏。
更新:我根据 Dirk 的评论添加了队列的删除。
最佳答案
更新
使用TPL Dataflow :
using System.Threading.Tasks.Dataflow;
public class DataflowStreamWriter
{
private readonly MemoryStream _stream = new MemoryStream();
private readonly ActionBlock<byte[]> _block;
public DataflowStreamWriter()
{
_block = new ActionBlock<byte[]>(
bytes => _stream.Write(bytes, 0, bytes.Length));
}
public void Write(byte[] data)
{
_block.Post(data);
}
}
这是一种更好的生产者-消费者方法。
每当有人向您的 ConcurrentStreamWriter
写入数据时例如,该数据将被添加到缓冲区中。此方法是线程安全的,多个线程可以同时写入数据。这些是您的制作人。
然后,您就有一个消费者 - 消耗缓冲区中的数据并将其写入流。
一个BlockingCollection<T>
用于生产者和消费者之间的通信。这样,如果没有人生产,消费者就会闲置。每当生产者启动并向缓冲区写入内容时,消费者就会醒来。
消费者是延迟初始化的 - 当且仅当某些数据首次可用时才会创建它。
public class ConcurrentStreamWriter : IDisposable
{
private readonly MemoryStream _stream = new MemoryStream();
private readonly BlockingCollection<byte> _buffer = new BlockingCollection<byte>(new ConcurrentQueue<byte>());
private readonly object _writeBufferLock = new object();
private Task _flusher;
private volatile bool _disposed;
private void FlushBuffer()
{
//keep writing to the stream, and block when the buffer is empty
while (!_disposed)
_stream.WriteByte(_buffer.Take());
//when this instance has been disposed, flush any residue left in the ConcurrentStreamWriter and exit
byte b;
while (_buffer.TryTake(out b))
_stream.WriteByte(b);
}
public void Write(byte[] data)
{
if (_disposed)
throw new ObjectDisposedException("ConcurrentStreamWriter");
lock (_writeBufferLock)
foreach (var b in data)
_buffer.Add(b);
InitFlusher();
}
public void InitFlusher()
{
//safely create a new flusher task if one hasn't been created yet
if (_flusher == null)
{
Task newFlusher = new Task(FlushBuffer);
if (Interlocked.CompareExchange(ref _flusher, newFlusher, null) == null)
newFlusher.Start();
}
}
public void Dispose()
{
_disposed = true;
if (_flusher != null)
_flusher.Wait();
_buffer.Dispose();
}
}
关于c# - 线程安全缓冲区包装流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22225410/