c# - 创建一个将流读取到最后的 Observable 的正确方法是什么

标签 c# system.reactive

我在这里挣扎。通常我会读一本书,但现在还没有。我发现了无数与使用 RX 读取流有关的各种事情的例子,但我发现很难理解。

我知道我可以使用 Observable.FromAsyncPattern 来创建 Stream 的 BeginRead/EndRead 或 BeginReadLine/EndReadLine 方法的包装器。

但这只会读取一次——当第一个观察者订阅时。

我想要一个 Observable,它将继续读取和抽取 OnNext,直到流出错或结束。

除此之外,我还想知道如何与多个订阅者共享该 observable,以便他们都能获得元素。

最佳答案

您可以使用Repeat 来保持阅读行直到流结束,并使用PublishReplay 来控制共享多个读者。

一个简单、完整的 Rx 解决方案示例,用于从任何流中读取行直到结束:

public static IObservable<string> ReadLines(Stream stream)
{
    return Observable.Using(
        () => new StreamReader(stream),
        reader => Observable.FromAsync(reader.ReadLineAsync)
                            .Repeat()
                            .TakeWhile(line => line != null));
}

此解决方案还利用了 ReadLine 在到达流末尾时返回 null 的事实。

关于c# - 创建一个将流读取到最后的 Observable 的正确方法是什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14454766/

相关文章:

system.reactive - subscribeOn 和 observeOn 应该只由最终订阅者调用吗?

c# - 基于推和基于拉的结构(如 IEnumerable<T> 和 IObservable<T>)之间有什么区别

c# - 使用 RX 跟踪可观察集合中的多个项目

c# - 在 C# 中使用 PowerShell 导入 CSV 添加列

c# - 在响应式扩展中包装一个文件观察器

.net - 为什么 Windows\Assembly 中可能缺少 System.Threading.dll?

c# - Mongo 'find' 方法不适用于 DateTime.MinValue

c# - HTML.DropDownListFor DropDownList

c# - .net 4.5中不存在使用System.Json的情况

c# - 将键值与 LinQ 分开