c# - 通过 Rx 正确使用多种资源

标签 c# system.reactive rx.net

我需要在 Rx 中使用多个一次性资源。这就是我嵌套 Observable.Using 语句的方式(内部源代码仅用于测试)。

var obs = Observable.Using(
    () => new FileStream("file.txt", FileMode.Open),
    fs => Observable.Using(
        () => new StreamReader(fs),
        sr => Observable.Create<string>(o =>
            TaskPoolScheduler.Default.ScheduleAsync(async (sch, ct) =>
            {
                while (!ct.IsCancellationRequested)
                {
                    var s = await sr.ReadLineAsync().ConfigureAwait(false);
                    if (s is null) break;

                    o.OnNext(s);
                }
                o.OnCompleted();
            }))));

obs.Subscribe(Console.WriteLine);

是否有更简洁的方法来使用多个一次性资源?

最佳答案

我想不出使用无限数量资源的通用方法,但至少您可以为 2-3 个资源的常见情况创建辅助方法。这是两个的实现:

public static IObservable<TResult> Using<TResult, TResource1, TResource2>(
    Func<TResource1> resourceFactory1,
    Func<TResource1, TResource2> resourceFactory2,
    Func<TResource1, TResource2, IObservable<TResult>> observableFactory)
    where TResource1 : IDisposable
    where TResource2 : IDisposable
{
    return Observable.Using(resourceFactory1, resource1 => Observable.Using(
        () => resourceFactory2(resource1),
        resource2 => observableFactory(resource1, resource2)));
}

使用示例:

var obs = Using(
    () => new FileStream("file.txt", FileMode.Open),
    (fs) => new StreamReader(fs),
    (fs, sr) => Observable.Create<string>(o =>
        TaskPoolScheduler.Default.ScheduleAsync(async (sch, ct) =>
        {
            while (!ct.IsCancellationRequested)
            {
                var s = await sr.ReadLineAsync().ConfigureAwait(false);
                if (s is null) break;

                o.OnNext(s);
            }
            o.OnCompleted();
        })));

关于c# - 通过 Rx 正确使用多种资源,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59115719/

相关文章:

c# - .net 中的 PayPal 定期付款

c# - "One Shot"Property Getters 是否被视为 .NET(或一般)中可接受的编码标准

c# - 你会用编译器即服务做什么

c# - 简单登录 C# 和 MySQL Web 应用程序

c# - 如何动态更新/添加项目到 IObservable<int> 中?

c# - 如何获取 IObservable<IObservable<T>> 的最新变化事件?

c# - Observable.Range 是否违反了 Observable 契约?

.net - 在 Rx 中聚合 ForkJoin 的结果

c# - 如何在 C# 中运行时更改 IObservable Func 谓词

c# - Rx.Net GroupBy 实现很少丢失元素序列