c# - 可观察网络 IO 解析

标签 c# system.reactive

我正在尝试使用 Rx 从 TCPClient 接收流中读取并将数据解析为字符串的 IObservable,由换行符“\r\n”分隔以下是我如何从套接字流接收...

var messages = new Subject<string>();

var functionReceiveSocketData =
            Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>
            (client.Client.BeginReceive, client.Client.EndReceive);

Func<byte[], int, byte[]> copy = (bs, n) =>
        {
            var rs = new byte[buffer.Length];
            bs.CopyTo(rs, 0);
            return rs;
        };

Observable
    .Defer(() =>
            {
                var buffer = new byte[50];
                return
                    from n in functionReceiveSocketData(buffer, 0, buffer.Length, SocketFlags.None)
                select copy(buffer, n);
            }).Repeat().Subscribe(x => messages.OnNext(System.Text.Encoding.UTF8.GetString(x)));

这是我用来解析字符串的方法。这目前不起作用...

obsStrings = messages.Buffer<string,string>(() =>  
                messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("\r\n"))
            );

消息主题以 block 的形式接收消息,因此我尝试将它们连接起来并测试连接后的字符串是否包含换行符,从而指示缓冲区关闭并输出缓冲的 block 。不知道为什么它不工作。似乎我只从 obsStrings 中获取了第一个 block 。

所以我正在寻找两件事。我想简化 io 流的读取并消除消息主题的使用。其次,我想让我的字符串解析工作。我已经对此进行了一段时间的黑客攻击,但无法提出可行的解决方案。我是 Rx 的初学者。

编辑:这是问题解决后的成品....

var receivedStrings = socket.ReceiveUntilCompleted(SocketFlags.None)
            .SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray())
            .Scan(String.Empty, (a, b) => (a.EndsWith("\r\n") ? "" : a) + b)
            .Where(x => x.EndsWith("\r\n"))
            .Select(buffered => String.Join("", buffered))
            .Select(a => a.Replace("\n", ""));

“ReceiveUntilCompleted”是 RXX 项目的扩展。

最佳答案

messages
    .Scan(String.Empty, (a, b) => (a.EndsWith("\r\n") ? "" : a) + b)
    .Where(x => x.EndsWith("\r\n"))

关于c# - 可观察网络 IO 解析,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10402770/

相关文章:

.net - onCompleted 和 onNext 未到达

C# 使用 AddDays 初始化日期

c# - 从 C# 中的同一个流中读取多次

f# 可观察的 fork 和副作用

c# - 响应式(Reactive) UI/响应式(Reactive)扩展 : how to clear ObservableAsPropertyHelper

c# - 通过可观察序列线程论证

c# - 控制文档中未给出串行命令的电机

c# - 在 blazor 中绑定(bind)动态值的最佳方法是什么

c# - 我如何为文本字段生成动态 ID 并从 asp.net 4 mvc3 中的模型数组获取数据

c# - 接收 : Pairing window duration with count of events raised inside the window