在 C# 控制台应用程序中,我尝试使用 System.Reactive.Linq 创建一个可观察对象,其中每个项目都是另一个可观察对象进行的某些处理的字符串结果。 我使用字符串和字符创建了一个简单的复制品。 警告,此示例完全是人为设计的,重点是嵌套的 .Wait() 挂起。
class Program
{
static void Main(string[] args)
{
string[] fileNames = { "file1.doxc", "file2.xlsx", "file3.pptx" };
IObservable<string> files = fileNames.ToObservable();
string[] extensions = files.Select(fn =>
{
var extension = fn.ToObservable()
.TakeLast(4)
.ToArray()
.Wait(); // <<<<<<<<<<<<< HANG HERE
return new string(extension);
})
.ToArray()
.Wait();
}
}
同样,这不是我查找许多文件名后缀的方式。 问题是我如何生成一个字符串 Observable,其中字符串是从一个完整的 Observable 中计算出来的。
如果我提取这段代码并单独运行它,它会很好地完成。
var extension = fn.ToObservable()
.TakeLast(4)
.ToArray()
.Wait();
异步方法中嵌套的 Wait() 有一些我不明白的地方。
如何编写嵌套异步可观察对象的代码,以便生成一个简单的字符串数组?
谢谢
-约翰
最佳答案
您的代码阻塞的原因是因为您在使用 ToObservable()
时未指定调度程序。在这种情况下,它将使用 CurrentThreadScheduler
。
因此 files
可观察问题首先是 OnNext()
[A](发送 “file1.doxc”
)使用当前线程。在 OnNext()
返回之前,它无法继续迭代。然而,内部 fn
observable 也使用 ToObservable()
和 Wait()
block 直到 fn
完成 - 它会将第一个 OnNext()
(发送 “f”
)排队到当前线程调度程序,但它永远无法发送它,因为现在第一个 OnNext()
[A] 永远不会返回。
两个简单的修复:
要么像这样更改 files
observable:
IObservable<string> files = fileNames.ToObservable(NewThreadScheduler.Default);
或者,避免将内部 Wait()
与 SelectMany
一起使用(这绝对是更惯用的 Rx):
string[] extensions = files.SelectMany(fn =>
{
return fn.ToObservable()
.TakeLast(4)
.ToArray()
.Select(x => new string(x));
})
.ToArray()
.Wait();
// display results etc.
每种方法都有完全不同的执行语义——第一种方法运行起来很像嵌套循环,每个内部可观察对象在下一个外部迭代之前完成。由于 Wait()
的阻塞行为被移除,第二个将更加交错。如果您使用 Spy我编写的方法并将其附加在两次 ToObservable()
调用之后,您会非常清楚地看到这种行为。
关于c# - 嵌套的 Observable 卡在 Wait() 上,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46016442/