我需要在 Rx 中使用多个一次性资源。这就是我嵌套 Observable.Using 语句的方式(内部源代码仅用于测试)。
var obs = Observable.Using(
() => new FileStream("file.txt", FileMode.Open),
fs => Observable.Using(
() => new StreamReader(fs),
sr => Observable.Create<string>(o =>
TaskPoolScheduler.Default.ScheduleAsync(async (sch, ct) =>
{
while (!ct.IsCancellationRequested)
{
var s = await sr.ReadLineAsync().ConfigureAwait(false);
if (s is null) break;
o.OnNext(s);
}
o.OnCompleted();
}))));
obs.Subscribe(Console.WriteLine);
是否有更简洁的方法来使用
多个一次性资源?
最佳答案
我想不出使用
无限数量资源的通用方法,但至少您可以为 2-3 个资源的常见情况创建辅助方法。这是两个的实现:
public static IObservable<TResult> Using<TResult, TResource1, TResource2>(
Func<TResource1> resourceFactory1,
Func<TResource1, TResource2> resourceFactory2,
Func<TResource1, TResource2, IObservable<TResult>> observableFactory)
where TResource1 : IDisposable
where TResource2 : IDisposable
{
return Observable.Using(resourceFactory1, resource1 => Observable.Using(
() => resourceFactory2(resource1),
resource2 => observableFactory(resource1, resource2)));
}
使用示例:
var obs = Using(
() => new FileStream("file.txt", FileMode.Open),
(fs) => new StreamReader(fs),
(fs, sr) => Observable.Create<string>(o =>
TaskPoolScheduler.Default.ScheduleAsync(async (sch, ct) =>
{
while (!ct.IsCancellationRequested)
{
var s = await sr.ReadLineAsync().ConfigureAwait(false);
if (s is null) break;
o.OnNext(s);
}
o.OnCompleted();
})));
关于c# - 通过 Rx 正确使用多种资源,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59115719/