c# - 如何防止 Rx 测试挂起?

标签 c# system.reactive rx.net

我正在使用下面的简化测试用例重现我的 Rx 问题。下面的测试挂起。我确信这是一件很小但很重要的事情,我错过了,但无法指出它。

   public class Service
   {
        private ISubject<double> _subject = new Subject<double>();
        public void Reset()
        {
            _subject.OnNext(0.0);
        }

        public IObservable<double> GetProgress()
        {
            return _subject;
        }
    }

    public class ObTest
    {
        [Fact]
        private async Task SimpleTest()
        {
            var service = new Service();

            var result = service.GetProgress().Take(1);

            var task = Task.Run(async () =>
            {
                service.Reset();
            });

            await result;
        }
    }

更新

我上面的尝试是稍微简化问题并理解它。就我而言GetProgress()是各种 Observables 的合并发布下载进度,其中之一 ObservablesSubject<double>发布0每当有人调用删除下载的方法时。

EnigmativityTheodor Zoulias识别的竞争条件可能(??)在现实生活中发生。我显示了一个试图获取进度的 View ,但是,快速的手指及时删除了它。

我需要多了解一点的是,如果再次开始下载(通过显示已经订阅的 View ,订阅现在已经发生)并且有人再次删除它。

   public class Service
   {
        private ISubject<double> _deleteSubject = new Subject<double>();
        public void Reset()
        {
            _deleteSubject.OnNext(0.0);
        }

        public IObservable<double> GetProgress()
        {
            return _deleteSubject.Merge(downloadProgress);
        }
    }

最佳答案

您的代码没有挂起。它正在等待一个有时永远不会获得值的可观察值。

你有竞争条件。

Task.Run 有时会在 await result 创建可观察对象的订阅之前执行完成 - 因此它永远看不到该值。

试试这个代码:

private async Task SimpleTest()
{
    var service = new Service();

    var result = service.GetProgress().Take(1);

    var awaiter = result.GetAwaiter();

    var task = Task.Run(() =>
    {
        service.Reset();
    });

    await awaiter;
}

关于c# - 如何防止 Rx 测试挂起?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59188126/

相关文章:

c# - 代码首次迁移中更新数据库命令出错

c# - 在 View 中显示 Google Analytics 数据

c# - Unity3d - 获取旋转物体的游戏对象高度

wpf - 使用 Rx 合并两个可观察集合并绑定(bind)到列表框

c# - 获取包含可观察属性的对象列表的单个可观察值?

c# - 如何让Rx回调在ThreadPool上运行?

c# - 将 JSON 对象转换为 JSON 数组时出错

c# - Rx 处置订阅

system.reactive - 我的 "zipLatest"运算符已经存在吗?

c# - 使用 Reactive Extensions (RX),是否可以添加 "Pause"命令?