c# - 异步使用 NamedPipeServerStream 和 NamedPipeClientStream

标签 c# .net named-pipes

我对服务器/客户端架构有以下要求:

  1. 编写异步工作的服务器/客户端。

  2. 通信需要是双工的,即两端都可以读取和写入。

  3. 多个客户端可以在任何给定时间连接到服务器。

  4. 服务器/客户端应等待,直到它们可用并最终建立连接。

  5. 客户端连接后,它应该写入流。

  6. 然后服务器应该从流中读取并将响应写回客户端。

  7. 最后,客户端应读取响应,通信应结束。

因此,考虑到以下要求,我编写了以下代码,但我不太确定它,因为管道的文档有些缺乏,不幸的是,代码似乎无法正常工作,它卡在某个点。

namespace PipesAsyncAwait471
{
    using System;
    using System.Collections.Generic;
    using System.IO.Pipes;
    using System.Linq;
    using System.Threading.Tasks;

    internal class Program
    {
        private static async Task Main()
        {
            List<Task> tasks = new List<Task> {
                HandleRequestAsync(),
            };

            tasks.AddRange(Enumerable.Range(0, 10).Select(i => SendRequestAsync(i, 0, 5)));

            await Task.WhenAll(tasks);
        }

        private static async Task HandleRequestAsync()
        {
            using (NamedPipeServerStream server = new NamedPipeServerStream("MyPipe",
                                                                            PipeDirection.InOut,
                                                                            NamedPipeServerStream.MaxAllowedServerInstances,
                                                                            PipeTransmissionMode.Message,
                                                                            PipeOptions.Asynchronous))
            {
                Console.WriteLine("Waiting...");

                await server.WaitForConnectionAsync().ConfigureAwait(false);

                if (server.IsConnected)
                {
                    Console.WriteLine("Connected");

                    if (server.CanRead) {
                        // Read something...
                    }

                    if (server.CanWrite) {
                        // Write something... 

                        await server.FlushAsync().ConfigureAwait(false);

                        server.WaitForPipeDrain();
                    }

                    server.Disconnect();

                    await HandleRequestAsync().ConfigureAwait(false);
                }
            }
        }

        private static async Task SendRequestAsync(int index, int counter, int max)
        {
            using (NamedPipeClientStream client = new NamedPipeClientStream(".", "MyPipe", PipeDirection.InOut, PipeOptions.Asynchronous))
            {
                await client.ConnectAsync().ConfigureAwait(false);

                if (client.IsConnected)
                {
                    Console.WriteLine($"Index: {index} Counter: {counter}");

                    if (client.CanWrite) {
                        // Write something...

                        await client.FlushAsync().ConfigureAwait(false);

                        client.WaitForPipeDrain();
                    }

                    if (client.CanRead) {
                        // Read something...
                    }
                }

                if (counter <= max) {
                    await SendRequestAsync(index, ++counter, max).ConfigureAwait(false);
                }
                else {
                    Console.WriteLine($"{index} Done!");
                }
            }
        }
    }
}

假设:

我期望它的工作方式是当我调用 SendRequestAsync 时发出的所有请求并发执行,每个请求然后发出额外的请求,直到达到 6 并且最后,它应该打印“完成!”。

备注:

  1. 我在 .NET Framework 4.7.1 和 .NET Core 2.0 上对其进行了测试,得到了相同的结果。

  2. 客户端和服务器之间的通信始终位于计算机本地,其中客户端是可以对某些作业进行排队的 Web 应用程序,例如启动第 3 方进程和服务器将作为 Windows 服务部署在与部署这些客户端的 Web 服务器相同的计算机上。

最佳答案

这是经过一些迭代后的完整代码:

PipeServer.cs:

namespace AsyncPipes;

using System.Diagnostics.CodeAnalysis;
using System.IO.Pipes;

public static class PipeServer
{
    public static void WaitForConnection()
        => WaitForConnectionInitializer();

    private static void WaitForConnectionInitializer()
    {
        var context = new ServerContext();
        var server = context.Server;
    
        try
        {
            Console.WriteLine($"Waiting a client...");

            server.BeginWaitForConnection(WaitForConnectionCallback, context);
        }
        catch
        {
            // We need to cleanup here only when something goes wrong.
            context.Dispose();

            throw;
        }

        static void WaitForConnectionCallback(IAsyncResult result)
        {
            var (context, server, _) = ServerContext.FromResult(result);

            server.EndWaitForConnection(result);

            WaitForConnectionInitializer();

            BeginRead(context);
        }

        static void BeginRead(ServerContext context)
        {
            var (_, server, requestBuffer) = context;
            
            server.BeginRead(requestBuffer, 0, requestBuffer.Length, ReadCallback, context);
        }

        static void BeginWrite(ServerContext context)
        {
            var (_, server, responseBuffer) = context;
            
            server.BeginWrite(responseBuffer, 0, responseBuffer.Length, WriteCallback, context);
        }

        static void ReadCallback(IAsyncResult result)
        {
            var (context, server, requestBuffer) = ServerContext.FromResult(result);

            var bytesRead = server.EndRead(result);

            if (bytesRead > 0)
            {
                if (!server.IsMessageComplete)
                {
                    BeginRead(context);
                }
                else
                {
                    var index = BitConverter.ToInt32(requestBuffer, 0);
                    Console.WriteLine($"{index} Request.");

                    BeginWrite(context);
                }
            }
        }

        static void WriteCallback(IAsyncResult result)
        {
            var (context, server, responseBuffer) = ServerContext.FromResult(result);
            var index = -1;
            
            try
            {
                server.EndWrite(result);
                server.WaitForPipeDrain();
                
                index = BitConverter.ToInt32(responseBuffer, 0);
                Console.WriteLine($"{index} Pong.");
            }
            finally
            {
                context.Dispose();
                Console.WriteLine($"{index} Disposed.");
            }
        }
    }

    private sealed class ServerContext : IDisposable
    {
        [NotNull]
        public byte[]? Buffer { get; private set; } = new byte[4];

        [NotNull]
        public NamedPipeServerStream? Server { get; private set; } = new ("PipesDemo",
                                                                        PipeDirection.InOut,
                                                                        NamedPipeServerStream.MaxAllowedServerInstances,
                                                                        PipeTransmissionMode.Message,
                                                                        PipeOptions.Asynchronous);

        public void Deconstruct(out ServerContext context, out NamedPipeServerStream server, out byte[] buffer)
            => (context, server, buffer) = (this, Server, Buffer);

        public static ServerContext FromResult(IAsyncResult result)
        {
            ArgumentNullException.ThrowIfNull(result.AsyncState);
            
            return (ServerContext)result.AsyncState;
        }
        
        public void Dispose()
        {
            if (Server is not null)
            {
                if (Server.IsConnected)
                {
                    Server.Disconnect();
                }
                
                Server.Dispose();
            }
            
            Server = null;
            Buffer = null;
        }
    }
}

PipeClient:

public static class PipeClient
{
    public static void CreateConnection(int index)
    {
        using var client = new NamedPipeClientStream(".", "PipesDemo", PipeDirection.InOut, PipeOptions.None);
        client.Connect();

        var requestBuffer = BitConverter.GetBytes(index);
        client.Write(requestBuffer, 0, requestBuffer.Length);
        client.Flush();
        client.WaitForPipeDrain();
        Console.WriteLine($"{index} Ping.");

        var responseBuffer = new byte[4];
        var bytesRead = client.Read(responseBuffer, 0, responseBuffer.Length);

        while (bytesRead > 0)
        {
            bytesRead = client.Read(responseBuffer, bytesRead - 1, responseBuffer.Length - bytesRead);
        }

        index = BitConverter.ToInt32(responseBuffer, 0);
        Console.WriteLine($"{index} Response.");
    }
}

Program.cs:

namespace AsyncPipes;

internal class Program
{
    private const int MaxRequests = 1000;

    private static void Main()
    {
        var tasks = new List<Task>
        {
            Task.Run(PipeServer.WaitForConnection)
        };

        tasks.AddRange(Enumerable.Range(0, MaxRequests - 1)
                                .Select(i => Task.Factory.StartNew(() => PipeClient.CreateConnection(i),
                                                                    TaskCreationOptions.LongRunning)));

        Task.WaitAll(tasks.ToArray());

        Console.ReadKey();
    }
}

您可以对消息进行排序并观察以下内容:

  1. 连接已正确打开和关闭。

  2. 数据发送和接收正确。

  3. 最后,服务器仍然等待进一步的连接。

更新:

PipeOptions.Asynchronous 更改为 PipeOptions.None,否则看起来它在请求期间挂起,然后才立即处理它们。

PipeOptions.Asynchronous 只是导致执行顺序与 PipeOptions.None 不同,这会在代码中暴露出竞争条件/死锁。例如,如果您使用任务管理器来监视进程的线程计数,您可以看到它的效果...您应该看到它以每秒 1 个线程的速度爬升,直到达到大约 100 个线程(可能是 110 左右),此时您的代码将运行完成。或者如果您在开头添加 ThreadPool.SetMinThreads(200, 200) 。您的代码存在一个问题,如果发生错误的排序(使用异步更容易出现这种情况),您将创建一个循环,直到有足够的线程来运行您的 main 方法已排队的所有并发 ConnectAsyncs 为止。 ,这并不是真正的异步,而是只是创建一个工作项来调用同步 Connect 方法(这是不幸的,这样的问题是我敦促人们不要公开将工作项简单地排队到的异步 API 的原因之一)调用同步方法)。 Source .

修改并简化了示例:

  1. 管道没有真正的异步 Connect 方法,ConnectAsync 在幕后使用 Task.Factory.StartNew,因此您可能只是也可以使用 Connect,然后将调用同步 Connect 版本的方法(在我们的示例中为 SendRequest)传递给 Task.Factory。开始新的

  2. 服务器现在完全异步,据我所知,它可以正常工作。

  3. 修复了所有 BeginXXX/EndXXX 方法。

  4. 删除了不必要的 try/catch block 。

  5. 删除了不必要的消息。

  6. 稍微重构一下代码,使其更具可读性和简洁性。

  7. 删除了服务器的 async/await 版本,因为我重构了代码,并且没有时间更新 async/await 版本,但是使用上面的版本,您可以了解如何做到这一点以及新的 API 更加友好且易于处理。

希望对您有所帮助。

关于c# - 异步使用 NamedPipeServerStream 和 NamedPipeClientStream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48059410/

相关文章:

c# - 将 session 拥有的 SQL Server sp_getapplock 与 EF6 DbContexts 一起使用是否安全?

c# - 在ASP.NET Core 2和Entity Framework Core 2中查询表中附近的位置

c# - 加载位图资源时,.net 5 应用程序不支持 BinaryFormatter

c# - 事件驱动的进程间通信(IPC)

c - 函数 ‘mknod’ 的隐式声明,但我包含了 header

c++ - Windows 相当于 Linux fifo

c# - 配置 MSBuild 以启动 PowerShell 并运行 Import-Module $(TargetFileName)

c# - 反序列化后 JSON 不保留值

.net - 工具条 (ToolStripDropDownButton) 关闭并失去窗口焦点

c# - 试图创建一个包含结构的数组?