我对服务器/客户端架构有以下要求:
编写异步工作的服务器/客户端。
通信需要是双工的,即两端都可以读取和写入。
多个客户端可以在任何给定时间连接到服务器。
服务器/客户端应等待,直到它们可用并最终建立连接。
客户端连接后,它应该写入流。
然后服务器应该从流中读取并将响应写回客户端。
最后,客户端应读取响应,通信应结束。
因此,考虑到以下要求,我编写了以下代码,但我不太确定它,因为管道的文档有些缺乏,不幸的是,代码似乎无法正常工作,它卡在某个点。
namespace PipesAsyncAwait471
{
using System;
using System.Collections.Generic;
using System.IO.Pipes;
using System.Linq;
using System.Threading.Tasks;
internal class Program
{
private static async Task Main()
{
List<Task> tasks = new List<Task> {
HandleRequestAsync(),
};
tasks.AddRange(Enumerable.Range(0, 10).Select(i => SendRequestAsync(i, 0, 5)));
await Task.WhenAll(tasks);
}
private static async Task HandleRequestAsync()
{
using (NamedPipeServerStream server = new NamedPipeServerStream("MyPipe",
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Message,
PipeOptions.Asynchronous))
{
Console.WriteLine("Waiting...");
await server.WaitForConnectionAsync().ConfigureAwait(false);
if (server.IsConnected)
{
Console.WriteLine("Connected");
if (server.CanRead) {
// Read something...
}
if (server.CanWrite) {
// Write something...
await server.FlushAsync().ConfigureAwait(false);
server.WaitForPipeDrain();
}
server.Disconnect();
await HandleRequestAsync().ConfigureAwait(false);
}
}
}
private static async Task SendRequestAsync(int index, int counter, int max)
{
using (NamedPipeClientStream client = new NamedPipeClientStream(".", "MyPipe", PipeDirection.InOut, PipeOptions.Asynchronous))
{
await client.ConnectAsync().ConfigureAwait(false);
if (client.IsConnected)
{
Console.WriteLine($"Index: {index} Counter: {counter}");
if (client.CanWrite) {
// Write something...
await client.FlushAsync().ConfigureAwait(false);
client.WaitForPipeDrain();
}
if (client.CanRead) {
// Read something...
}
}
if (counter <= max) {
await SendRequestAsync(index, ++counter, max).ConfigureAwait(false);
}
else {
Console.WriteLine($"{index} Done!");
}
}
}
}
}
假设:
我期望它的工作方式是当我调用 SendRequestAsync
时发出的所有请求并发执行,每个请求然后发出额外的请求,直到达到 6
并且最后,它应该打印“完成!”。
备注:
我在 .NET Framework 4.7.1 和 .NET Core 2.0 上对其进行了测试,得到了相同的结果。
客户端和服务器之间的通信始终位于计算机本地,其中客户端是可以对某些作业进行排队的 Web 应用程序,例如启动第 3 方进程和服务器将作为 Windows 服务部署在与部署这些客户端的 Web 服务器相同的计算机上。
最佳答案
这是经过一些迭代后的完整代码:
PipeServer.cs:
namespace AsyncPipes;
using System.Diagnostics.CodeAnalysis;
using System.IO.Pipes;
public static class PipeServer
{
public static void WaitForConnection()
=> WaitForConnectionInitializer();
private static void WaitForConnectionInitializer()
{
var context = new ServerContext();
var server = context.Server;
try
{
Console.WriteLine($"Waiting a client...");
server.BeginWaitForConnection(WaitForConnectionCallback, context);
}
catch
{
// We need to cleanup here only when something goes wrong.
context.Dispose();
throw;
}
static void WaitForConnectionCallback(IAsyncResult result)
{
var (context, server, _) = ServerContext.FromResult(result);
server.EndWaitForConnection(result);
WaitForConnectionInitializer();
BeginRead(context);
}
static void BeginRead(ServerContext context)
{
var (_, server, requestBuffer) = context;
server.BeginRead(requestBuffer, 0, requestBuffer.Length, ReadCallback, context);
}
static void BeginWrite(ServerContext context)
{
var (_, server, responseBuffer) = context;
server.BeginWrite(responseBuffer, 0, responseBuffer.Length, WriteCallback, context);
}
static void ReadCallback(IAsyncResult result)
{
var (context, server, requestBuffer) = ServerContext.FromResult(result);
var bytesRead = server.EndRead(result);
if (bytesRead > 0)
{
if (!server.IsMessageComplete)
{
BeginRead(context);
}
else
{
var index = BitConverter.ToInt32(requestBuffer, 0);
Console.WriteLine($"{index} Request.");
BeginWrite(context);
}
}
}
static void WriteCallback(IAsyncResult result)
{
var (context, server, responseBuffer) = ServerContext.FromResult(result);
var index = -1;
try
{
server.EndWrite(result);
server.WaitForPipeDrain();
index = BitConverter.ToInt32(responseBuffer, 0);
Console.WriteLine($"{index} Pong.");
}
finally
{
context.Dispose();
Console.WriteLine($"{index} Disposed.");
}
}
}
private sealed class ServerContext : IDisposable
{
[NotNull]
public byte[]? Buffer { get; private set; } = new byte[4];
[NotNull]
public NamedPipeServerStream? Server { get; private set; } = new ("PipesDemo",
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Message,
PipeOptions.Asynchronous);
public void Deconstruct(out ServerContext context, out NamedPipeServerStream server, out byte[] buffer)
=> (context, server, buffer) = (this, Server, Buffer);
public static ServerContext FromResult(IAsyncResult result)
{
ArgumentNullException.ThrowIfNull(result.AsyncState);
return (ServerContext)result.AsyncState;
}
public void Dispose()
{
if (Server is not null)
{
if (Server.IsConnected)
{
Server.Disconnect();
}
Server.Dispose();
}
Server = null;
Buffer = null;
}
}
}
PipeClient:
public static class PipeClient
{
public static void CreateConnection(int index)
{
using var client = new NamedPipeClientStream(".", "PipesDemo", PipeDirection.InOut, PipeOptions.None);
client.Connect();
var requestBuffer = BitConverter.GetBytes(index);
client.Write(requestBuffer, 0, requestBuffer.Length);
client.Flush();
client.WaitForPipeDrain();
Console.WriteLine($"{index} Ping.");
var responseBuffer = new byte[4];
var bytesRead = client.Read(responseBuffer, 0, responseBuffer.Length);
while (bytesRead > 0)
{
bytesRead = client.Read(responseBuffer, bytesRead - 1, responseBuffer.Length - bytesRead);
}
index = BitConverter.ToInt32(responseBuffer, 0);
Console.WriteLine($"{index} Response.");
}
}
Program.cs:
namespace AsyncPipes;
internal class Program
{
private const int MaxRequests = 1000;
private static void Main()
{
var tasks = new List<Task>
{
Task.Run(PipeServer.WaitForConnection)
};
tasks.AddRange(Enumerable.Range(0, MaxRequests - 1)
.Select(i => Task.Factory.StartNew(() => PipeClient.CreateConnection(i),
TaskCreationOptions.LongRunning)));
Task.WaitAll(tasks.ToArray());
Console.ReadKey();
}
}
您可以对消息进行排序并观察以下内容:
连接已正确打开和关闭。
数据发送和接收正确。
最后,服务器仍然等待进一步的连接。
更新:
将 PipeOptions.Asynchronous
更改为 PipeOptions.None
,否则看起来它在请求期间挂起,然后才立即处理它们。
PipeOptions.Asynchronous 只是导致执行顺序与 PipeOptions.None 不同,这会在代码中暴露出竞争条件/死锁。例如,如果您使用任务管理器来监视进程的线程计数,您可以看到它的效果...您应该看到它以每秒 1 个线程的速度爬升,直到达到大约 100 个线程(可能是 110 左右),此时您的代码将运行完成。或者如果您在开头添加 ThreadPool.SetMinThreads(200, 200) 。您的代码存在一个问题,如果发生错误的排序(使用异步更容易出现这种情况),您将创建一个循环,直到有足够的线程来运行您的 main 方法已排队的所有并发 ConnectAsyncs 为止。 ,这并不是真正的异步,而是只是创建一个工作项来调用同步 Connect 方法(这是不幸的,这样的问题是我敦促人们不要公开将工作项简单地排队到的异步 API 的原因之一)调用同步方法)。 Source .
修改并简化了示例:
管道没有真正的异步
Connect
方法,ConnectAsync
在幕后使用Task.Factory.StartNew
,因此您可能只是也可以使用Connect
,然后将调用同步Connect
版本的方法(在我们的示例中为SendRequest
)传递给Task.Factory。开始新的
。服务器现在完全异步,据我所知,它可以正常工作。
修复了所有 BeginXXX/EndXXX 方法。
删除了不必要的 try/catch block 。
删除了不必要的消息。
稍微重构一下代码,使其更具可读性和简洁性。
删除了服务器的 async/await 版本,因为我重构了代码,并且没有时间更新 async/await 版本,但是使用上面的版本,您可以了解如何做到这一点以及新的 API 更加友好且易于处理。
希望对您有所帮助。
关于c# - 异步使用 NamedPipeServerStream 和 NamedPipeClientStream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48059410/