c# - Observable.Range 被重复?

标签 c# system.reactive reactive-programming

Rx 新手——我有一个序列,除了看起来重复之外,似乎运行正常。

我认为我遗漏了对 Select()SelectMany() 的调用,这些调用触发了重新评估的范围。

代码说明和我想要做什么

  • 对于所有数字,循环执行检索数据的方法(从数据库分页)。
  • 最终,该数据将为空(我只想在检索数据时继续处理
  • 对于检索到的每条记录,我只想处理应该处理的记录
  • 对于应处理的内容,我希望并行处理最多 x 个内容(根据设置)。
  • 我想等到整个序列完成后再退出该方法(因此在最后调用 wait )。

下面的代码有问题

  • 我使用一个数据集运行代码,我知道该数据集只有 1 个项目。
    • 因此,第 0 页返回 1 个项目,第 1 页返回 0 个项目。
  • 我的期望是该流程针对一个项目运行一次。
  • 但是,我发现页面 0 和 1 都被调用了两次,因此该进程运行了两次。

我认为这与导致范围从 0 开始重新计算的调用有关,但我无法弄清楚它是什么。

代码

var query = Observable.Range(0, int.MaxValue)
    .Select(pageNum =>
        {
            _etlLogger.Info("Calling GetResProfIDsToProcess with pageNum of {0}", pageNum);
            return _recordsToProcessRetriever.GetResProfIDsToProcess(pageNum, _processorSettings.BatchSize);
        })
    .TakeWhile(resProfList => resProfList.Any())
    .SelectMany(records => records.Where(x=> _determiner.ShouldProcess(x)))
    .Select(resProf => Observable.Start(async () => await _schoolDataProcessor.ProcessSchoolsAsync(resProf)))
    .Merge(maxConcurrent: _processorSettings.ParallelProperties)
    .Do(async trackingRequests =>
    {
        await CreateRequests(trackingRequests.Result, createTrackingPayload);

        var numberOfAttachments = SumOfRequestType(trackingRequests.Result, TrackingRecordRequestType.AttachSchool);
        var numberOfDetachments = SumOfRequestType(trackingRequests.Result, TrackingRecordRequestType.DetachSchool);
        var numberOfAssignmentTypeUpdates = SumOfRequestType(trackingRequests.Result,
            TrackingRecordRequestType.UpdateAssignmentType);

        _etlLogger.Info("Extractor generated {0} attachments, {1} detachments, and {2} assignment type changes.",
            numberOfAttachments, numberOfDetachments, numberOfAssignmentTypeUpdates);
    });

var subscription = query.Subscribe(
trackingRequests =>
{
    //Nothing really needs to happen here. Technically we're just doing something when it's done.
},
() =>
{
    _etlLogger.Info("Finished! Woohoo!");
});

await query.Wait();

最佳答案

这是因为您订阅了该序列两次。一次在 query.Subscribe(...) 处,再次在 query.Wait() 处。

Observable.Range(0, int.MaxValue) 是一个冷可观察值。每次订阅都会重新评估。您可以通过使用 Publish() 发布可观察对象,然后订阅它,然后 Connect(),然后 Wait() 。如果您在最后一个元素已经生成后调用 Wait(),这确实会增加获得 InvalidOperationException 的风险。更好的替代方案是 LastOrDefaultAsync()

这会给你带来这样的结果:

var connectable = query.Publish();
var subscription = connectable.Subscribe(...);
subscription = new CompositeDisposable(connectable.Connect(), subscription);
await connectable.LastOrDefaultAsync();

或者您可以避免等待并直接使用 ToTask() 返回任务(请从方法签名中删除异步)。

return connectable.LastOrDefaultAsync().ToTask();

一旦转换为任务,您可以使用 Wait() 同步等待它(不要将 Task.Wait()Observable.Wait( ))。

connectable.LastOrDefaultAsync().ToTask().Wait();

但是,您很可能根本不想等待!在异步上下文中等待没有什么意义。您应该做什么,将序列完成后需要运行的剩余代码放入订阅的 OnComplete() 部分中。如果您有即使在取消订阅 (Dispose) 时也需要运行的(清理)代码,请考虑 Observable.UsingFinally(...) 方法以确保此代码已运行。

关于c# - Observable.Range 被重复?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31518230/

相关文章:

c# - 如何升级在 Windows 故障转移群集上运行的通用应用程序

c# - 无法将套接字重新绑定(bind)到现有 IP/端口组合

java - 在 Android 中使用 RX2 时出现 NetworkOnMainThreadException

javascript - 为什么在 Redux-Saga 上使用 Redux-Observable?

angular - 使用异步代码扫描操作符

java - 在 RxJava 中将 Observable<List<Car>> 转换为 Observable<Car> 序列

c# - 在 .NET 中单击确认对话框 Selenium

c# - 通过 Socket 直接使用 TcpClient 有什么好处?

c# - 有没有办法听到 Reactive Extensions 中没有引发任何事件?

c# - Nuget 上的 System.Reactive/Rx .NET 4.0