我有一个允许数千个客户端连接的套接字应用程序。它将它们存储在 ConcurrentDictionary<int, Socket>
并且仅在请求-响应情况下运行:
像这样:
public Task<Message> Request(int clientId, Message message)
{
Socket client;
return Clients.TryGetValue(clientId, out client)
? RequestInternal(client, message);
: _EmptyTask;
}
public async Task<Message> RequestInternal(Socket client, Message message)
{
await SendAsync(client, message).ConfigureAwait(false);
return await ReceiveOneAsync(client).ConfigureAwait(false);
}
现在我需要更改此应用程序以允许客户随时向我发送任何内容;即使没有我提出要求。这 - 我认为 - 将需要不断地从套接字接收和完全不同的方法。
问题:
我的想法:
免责声明:这部分有点长,完全是假设性的。如果您对上述问题有答案,则可以跳过。
我的想法是:
BlockingCollection<Message>
. BlockingCollection
处理收到的消息的GetConsumingEnumerable
方法。 处理线程将执行此操作:
foreach (var message in Messages.GetConsumingEnumerable())
ProcessMessage(message);
有了这个,我可以接收和处理客户端发送的所有内容,但将发送来回复我的请求的消息与发送的消息区分开来,因为客户端需要这将是一个问题。
我想我可以随请求发送一个唯一标识符字节(该特定客户端唯一)。然后客户端可以在响应中将该标识符发回给我,我可以使用它来区分响应。
ProcessMessage(Message msg)
{
// msg is a message from msg.Sender.
if (msg.Id == 0)
{
// msg is not a response, do processing.
}
else
{
// msg is a response to the message that's sent with msg.Id.
// Find the request that:
// * ...is made to msg.Sender
// * ...and has the msg.Id as identifier.
// And process the response according to that.
}
}
这意味着我还必须存储请求。
这是
RequestInternal
的假设版本:编辑:已替换
Wait
与 await
通话在斯蒂芬克利里的回答之后。private async Task RequestInternal(Socket client, Message message)
{
var request = new Request(client, message);
Requests.Add(request);
await SendAsync(client, message).ConfigureAwait(false);
return await request.Source.Task.ConfigureAwait(false);
}
和
Request
类(class):private sealed class Request
{
public readonly byte Id;
public readonly Socket Client;
public readonly Message Message;
public readonly TaskCompletionSource<Message> Source;
public Request(Socket client, Message message)
{
Client = client;
Message = message;
Source = new TaskCompletionSource<Message>();
// Obtain a byte unique to that socket...
Id = GetId(client);
}
}
和
ProcessMessage
变成这样:ProcessMessage(Message msg)
{
if (msg.Id == 0)
OnReceived(msg); // To raise an event.
else
{
// Method to find a request using msg.Sender and msg.Id
var request = Requests.Find(msg);
if (request != null)
request.Source.SetResult(msg);
}
}
虽然不知道是什么样的收藏类型
Requests
将会。编辑:我用过
ConcurrentDictionary<Key, Request>
在哪里 Key
是具有 Int32
的私有(private)结构(套接字的 ID)和 Byte
(消息的 ID)字段。它还实现了 IEquatable<T>
.
最佳答案
我写了一个TCP/IP .NET Sockets FAQ几年前,它解决了一些常见问题(例如 message framing 、 continuous reading 和 explanations of common errors )。代码示例均使用 Socket
类,但相同的概念适用于所有 TCP/IP 套接字。
关于您的协议(protocol)设计和请求/响应匹配,整体方法听起来不错。您需要确保您是线程安全的(例如, Requests
可能是 ConcurrentDictionary
)。此外,您应该 await
SendAsync
而不是调用Wait
.
我已经尝试过但尚未投入生产的另一种方法是基于TPL Dataflow。 .您可以为每个客户端创建一个表示“输出”的 block ,为“输入”创建另一个 block 。然后,您可以在其上分层您的消息框架,并在其上分层您的请求/响应匹配,然后将任何剩余的(未经请求的)消息发送到单个共享 BufferBlock
.
因此,您的“最终用户”API 最终将如下所示:
// Send a request and asynchronously receive a matching response.
Task<Message> RequestAsync(int clientId, Message message);
// Endpoint for unsolicited messages.
IReceivableSourceBlock<Tuple<int, Message>> UnsolicitedMessages { get; }
然后您可以连接
ActionBlock
至UnsolicitedMessages
每当有人进来时执行委托(delegate)。
关于c# - 允许处理无响应消息的请求-响应逻辑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13990736/