c# - 拆分 IObservable<byte[]> 到字符然后到行

标签 c# system.reactive

Rx 很棒,但有时很难找到做某事的优雅方式。这个想法很简单。我收到带有字节 [] 的事件,此数组可能包含一行的一部分、多行或一行。我想要的是找到一种方法让 IObservable of Line 所以 IObservable<String> ,其中序列的每个元素都是一行。

几个小时后,我找到的最接近的解决方案非常难看,当然也行不通,因为扫描会在每个字符上触发 OnNext :

//Intermediate subject use to transform byte[] into char
var outputStream = new Subject<char>();
_reactiveSubcription = outputStream
    //Scan doesn't work it trigger OnNext on every char
    //Aggregate doesn't work neither as it doesn't return intermediate result
    .Scan(new StringBuilder(), (builder, c) => c == '\r' ? new StringBuilder() : builder.Append((char)c))
    .Subscribe(this);


Observable.FromEventPattern<ShellDataEventArgs>(shell, "DataReceived")
            //Data is a byte[]
            .Select(_ => _.EventArgs.Data)
            .Subscribe(array => array.ToObservable()
            //Convert into char
            .ForEach(c => outputStream.OnNext((char)c)));

备注:_reactiveSubcription应该是 IObservable<String> .

如果不考虑字符编码问题,我缺少什么来完成这项工作?

最佳答案

这对我有用。

首先,将 byte[] 转换为字符串并在 \r 上拆分字符串(Regex Split 保留分隔符)。

现在有一个字符串流,其中一些以 \r 结尾.

然后 Concat,以保持它们的顺序。此外,由于 strings下一步需要热门,发布它们。

var strings = bytes.
  Select(arr => (Regex.Split(Encoding.Default.GetString(arr, 0, arr.Length - 1), "(\r)")).
    Where(s=> s.Length != 0).
    ToObservable()).
  Concat().
  Publish().
  RefCount();

创建一个字符串窗口,当字符串以 \r 结尾时结束. strings需要很热,因为它同时用于窗口内容和窗口结束触发器。

var linewindows = strings.Window(strings.Where(s => s.EndsWith("\r")));

将每个窗口聚合成一个字符串。

var lines = linewindows.SelectMany(w => w.Aggregate((l, r) => l + r));

linesIObservable<String>每个字符串包含一行。

为了对此进行测试,我使用了以下生成器来生成 IObservable<byte[]>

var bytes = Observable.
Range(1, 10).
SelectMany(i => Observable.
    Return((byte)('A' + i)).
    Repeat(24).
    Concat(Observable.
        Return((byte)'\r'))).
Window(17).
SelectMany(w => w.ToArray());

关于c# - 拆分 IObservable<byte[]> 到字符然后到行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31208418/

相关文章:

c# - 设置加载指示器的位置

c# - Excel 插件 COM 插件 VS VSTO 插件

c# - 在 C# 中链接多个相同方法的调用

system.reactive - 如何在 Observable.Do 扩展方法中传递异步方法?

c# - 在 c# 中是否有用于可观察对象的 if/then/else 运算符可用?

c# - 在 Android 或 iOS 平台中,应该使用哪个 Rx 调度程序来观察主线程?

c# - 检查 XML 元素是否等于另一个 XML 元素,忽略空值

c# - 如何使用 ASP.NET 在负载平衡服务器上存储和检索上传的图像?

c# - WinForms/C# : Adding items to Combox and controlling the Item value (numeric)

c# - Rx 框架 : execute an action on timeout without interrupting the original observable sequence