c# - 使用 TaskCompletionSource 作为 WaitHandle 的替代品是否可以接受?

标签 c# asynchronous async-await

我的代码处理与远程主机的 TCP 连接,带有 ConcurrentQueue存储传出消息。它旨在在单个线程中运行。连接的生命周期包含在 RunAsync 中而一个单独的对象包含连接的“公共(public)状态”:

class PublicState
{
    internal readonly ConcurrentQueue<Message> OutgoingMessageQueue = new ConcurrentQueue<Message>();
    internal TaskCompletionSource<Object> OutgoingMessageTcs = null;

    internal readonly TaskCompletionSource<Object> ConnectedTcs = new TaskCompletionSource<Object>();

    public void EnqueueMessages(IEnumerable<Message> messages)
    {
        foreach( Message m in messages ) this.OutgoingMessageQueue.Enqueue( m);
        if( this.OutgoingMessageTcs == null ) this.OutgoingMessageTcs = new TaskCompletionSource<Object>();
        this.OutgoingMessageTcs.SetResult( null );
    }
}

static async Task RunAsync(IPEndPoint endPoint, PublicState state)
{
    using( TcpClient tcp = new TcpClient() )
    {
        await tcp.ConnectAsync( endPoint.Address, endPoint.Port ).ConfigureAwait(false);

        Byte[] reusableBuffer = new Byte[ 4096 ];

        using( NetworkStream ns = tcp.GetStream() )
        {
            state.ConnectedTcs.SetResult( null );

            Task<Int32> nsReadTask = null;

            while( tcp.Connected )
            {
                if( !state.writeQueue.IsEmpty )
                {
                    await WriteMessagesAsync( ... ).ConfigureAwait( false );
                }

                if( ns.DataAvailable )
                {
                    await ReadMessagesAsync( ... ).ConfigureAwait( false );
                }

                // Wait for new data to arrive from remote host or for new messages to send:
                if( state.OutgoingMessageTcs == null ) state.OutgoingMessageTcs = new TaskCompletionSource<Object>();
                if( nsReadTask == null ) nsReadTask = ns.ReadAsync( reusableBuffer, 0, 0 ).ConfigureAwait( false );

                Task c = await Task.WhenAny( state.OutgoingMessageTcs,  nsReadTask ).ConfigureAwait( false );
                if( c == state.OutgoingMessageTcs.Task ) state.OutgoingMessageTcs = null;
                else if( c == nsReadTask ) nsReadTask = null;
            }
        }
    }
}

这样使用:

public async Task Main(String[] args)
{
    PublicState state = new PublicState();
    Task clientTask = Client.RunAsync( new IPEndPoint(args[0]), state );

    await state.ConnectedTcs.Task; // awaits until TCP connection is established

    state.EnqueueMessage( new Message("foo") );
    state.EnqueueMessage( new Message("bar") );
    state.EnqueueMessage( new Message("baz") );

    await clientTask; // awaits until the TCP connection is closed
}

此代码有效,但我不喜欢它:感觉我在使用 TaskCompletionSource这意味着代表一个实际的任务或某种后台操作,而我实际上使用的是 TaskCompletionSource作为一种廉价EventWaitHandle .我没有使用 EventWaitHandle因为它是IDisposable (我不想冒泄露 native 资源的风险)并且它缺少 WaitAsyncWaitOneAsync方法。我可以使用 SemaphoreSlim (这是可等待的,但包装了一个 EventWaitHandle )但我的代码并不真正代表信号量的良好使用。

我用的是TaskCompletionSource<T>可以接受,或者是否有更好的方法在 RunAsync 中“取消等待”执行当一个项目被添加到 OutgoingMessageQueue

我觉得它“错误”的另一个原因是 TaskCompletionSource<T>只能使用一次,然后需要更换。我很想避免多余的分配。

最佳答案

如果我对您的理解正确 - TPL BufferBlock 可能就是您所需要的。当前的Enqueue类比为Post,您可以通过ReceiveAsync扩展方法接收下一个值。

所以有了 BufferBlock,你的代码就变成了这样:

class PublicState {
    internal readonly BufferBlock<Message> OutgoingMessageQueue = new BufferBlock<Message>();
    internal readonly TaskCompletionSource<Object> ConnectedTcs = new TaskCompletionSource<Object>();

    public void EnqueueMessage(Message message) {
        this.OutgoingMessageQueue.Post(message);
    }
}

static async Task RunAsync(IPEndPoint endPoint, PublicState state) {
    using (TcpClient tcp = new TcpClient()) {
        await tcp.ConnectAsync(endPoint.Address, endPoint.Port).ConfigureAwait(false);

        Byte[] reusableBuffer = new Byte[4096];

        using (NetworkStream ns = tcp.GetStream()) {
            state.ConnectedTcs.SetResult(null);

            Task<Int32> nsReadTask = null;
            Task<Message> newMessageTask = null;
            while (tcp.Connected) {
                // Wait for new data to arrive from remote host or for new messages to send:
                if (nsReadTask == null)
                    nsReadTask = ns.ReadAsync(reusableBuffer, 0, 0);
                if (newMessageTask == null)
                    newMessageTask = state.OutgoingMessageQueue.ReceiveAsync();
                var completed = await Task.WhenAny(nsReadTask, newMessageTask).ConfigureAwait(false);
                if (completed == newMessageTask) {
                    var result = await newMessageTask;
                    // do stuff
                    newMessageTask = null;
                }
                else {
                    var bytesRead = await nsReadTask;
                    nsReadTask = null;
                }
            }
        }
    }
}

作为奖励,这个版本(我认为)是线程安全的,而您当前的版本不是,因为您正在使用 OutgoingMessageTcs 从潜在的多个线程(线程RunAsyncEnqueueMessages 调用者的线程)。

如果出于某种原因您不喜欢 BufferBlock - 您可以使用 Nito.AsyncEx nuget 包中的 AsyncCollection 以完全相同的方式.初始化变为:

internal readonly AsyncCollection<Message> OutgoingMessageQueue = new AsyncCollection<Message>(new ConcurrentQueue<Message>());

并获取:

if (newMessageTask == null)
   newMessageTask = state.OutgoingMessageQueue.TakeAsync();

关于c# - 使用 TaskCompletionSource 作为 WaitHandle 的替代品是否可以接受?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49727015/

相关文章:

java - 为什么 Java 没有异步/等待?

c# - Java 相当于 C# 的 StreamReader.ReadToEnd()

c# - 尝试更改 MainPage 但新页面上的 WebView 无法加载站点。但是当我在应用程序启动时将其设置为主页时它成功了

c# - 使用 async/await 是否比使用 task.Start() 更好,为什么?

c# - 在 ASP.NET MVC 中嵌套异步/等待调用

c# - 不等待异步方法调用可以吗?

javascript - 异步等待和非异步函数的问题

c# - 调用 xamarin 的目标已引发异常

c# - 如何将我的 ImageButton 添加到 ToolStrip

javascript - 尝试在 WordPress 网站上的脚本标记后添加异步