.net - 如何使用 Socket 和 Reactive 扩展(Rx)从连接的客户端套接字获取接收到的消息缓冲区

标签 .net sockets c#-4.0 system.reactive reactive-programming

因为我对 Rx 有点陌生,并且正在学习如何通过它。我检查了很多例子,但没有一个适合我的需要。

场景:我有一个套接字服务器套接字(使用简单的套接字对象而不是 TCPListener 对象创建)。我有多个客户端(客户端套接字而不是 TCPClient)连接到此服务器套接字。我正在尝试使用 Rx(响应式扩展)通过对 BeginReceive 和 EndReceive 异步操作使用“FromAsyncPattern”运算符来获取客户端套接字发送的消息。

我几乎已经开始在缓冲区中获取消息了。但问题是缓冲区要么返回从先前操作接收到的相同消息,要么将混合来自当前和先前接收操作的内容。

这是使用的代码:

    private void ReceiveMessageFromClient(Socket socket)
    {
        try
        {
            if (socket != null)
            {
                byte[] buffer = new byte[400];

                var functionReceiveSocketData = Observable.FromAsyncPattern
                                                            <byte[], int, int, SocketFlags, int>(socket.BeginReceive, socket.EndReceive);

                Observable.Defer(() =>
                                functionReceiveSocketData(buffer, 0, 400, SocketFlags.None)
                                ).Repeat().Subscribe(onNext: x => OnMessageReceived(buffer));
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
    }

OnMessageReceived 方法只是一个委托(delegate),用于引发消息接收事件并将缓冲区发送到其他类以进一步处理。

问题:每次我收到消息时都会使用相同的缓冲区,我如何才能从先前接收的缓冲区中获得干净的消息。

-- 如果我收到部分消息并且下一条消息实际上是同一消息的一部分,应该怎么做。

请用一些非常有用的代码片段帮助我解决上述问题。发布这个问题的原因是因为我到目前为止所经历的实现是使用 TCPListener ,这里的约束是使用套接字对象:(

提前致谢。

最佳答案

您遇到的问题是您在重复调用可观察对象之间共享相同的缓冲区实例。您需要确保每次都有一个新的缓冲区实例。看来您应该根据返回整数截断缓冲区。这是我的建议,需要进入您的if陈述:

var functionReceiveSocketData =
    Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>
        (socket.BeginReceive, socket.EndReceive);

Func<byte[], int, byte[]> copy = (bs, n) =>
{
    var rs = new byte[n];
    bs.CopyTo(rs, 0);
    return rs;
};

Observable
    .Defer(() =>
    {
        var buffer = new byte[400];
        return 
            from n in functionReceiveSocketData(buffer, 0, 400, SocketFlags.None)
            select copy(buffer, n);
    })
    .Repeat()
    .Subscribe(x => OnMessageReceived(x));

编辑:响应评论重新加入消息部分。还修复了上面的解决方案 - 应该是 0而不是 nCopyTo copy 中的函数 lambda 。

我不是使用套接字的专家,但是,因为套接字(如果我错了,请纠正我)只是一个字节流,你需要能够确定流中的每条消息何时完成。

这是一种可能的方法:
/* include `functionReceiveSocketData` & `copy` from above */ =

Func<byte[], byte[], byte[]> append = (bs1, bs2) =>
{
    var rs = new byte[bs1.Length + bs2.Length];
    bs1.CopyTo(rs, 0);
    bs2.CopyTo(rs, bs1.Length);
    return rs;
};

var messages = new Subject<byte[]>();
messages.Subscribe(x => OnMessageReceived(x));

Observable
    .Defer(() => /* as above */)
    .Repeat()
    .Scan(new byte[] { }, (abs, bs) =>
    {
        var current = append(abs, bs);
        if (isCompleteMessage(current))
        {
            messages.OnNext(current);
            current = new byte[] { };
        }
        return current;
    })
    .Subscribe();

您唯一需要做的就是弄清楚如何实现 isCompleteMessage功能。

关于.net - 如何使用 Socket 和 Reactive 扩展(Rx)从连接的客户端套接字获取接收到的消息缓冲区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6839802/

相关文章:

c# 索引超出了数组的范围

c# - 为什么 FileSystemWatcher 在创建文件时引发 Changed 事件?

asp.net-mvc-3 - 此版本的 SQL Server 不支持没有聚集索引的表

c# - 无需注册表即可确定框架版本的方法

c# - 定义继承后的 Protobuf-net 反序列化

java - 输入套接字意外关闭时吞下 IOException

java - 即使使用flush(),我的程序也被卡在/阻塞在 “in.readObject()”中

c# - 线程池中可用线程数

c# - WCF 服务的 basicHttpBinding 上的 HTTPS

python - 在多个端口上运行套接字脚本