Rx 新手——我有一个序列,除了看起来重复之外,似乎运行正常。
我认为我遗漏了对 Select()
或 SelectMany()
的调用,这些调用触发了重新评估的范围。
代码说明和我想要做什么
- 对于所有数字,循环执行检索数据的方法(从数据库分页)。
- 最终,该数据将为空(我只想在检索数据时继续处理
- 对于检索到的每条记录,我只想处理应该处理的记录
- 对于应处理的内容,我希望并行处理最多
x
个内容(根据设置)。 - 我想等到整个序列完成后再退出该方法(因此在最后调用 wait )。
下面的代码有问题
- 我使用一个数据集运行代码,我知道该数据集只有 1 个项目。
- 因此,第 0 页返回 1 个项目,第 1 页返回 0 个项目。
- 我的期望是该流程针对一个项目运行一次。
- 但是,我发现页面 0 和 1 都被调用了两次,因此该进程运行了两次。
我认为这与导致范围从 0 开始重新计算的调用有关,但我无法弄清楚它是什么。
代码
var query = Observable.Range(0, int.MaxValue)
.Select(pageNum =>
{
_etlLogger.Info("Calling GetResProfIDsToProcess with pageNum of {0}", pageNum);
return _recordsToProcessRetriever.GetResProfIDsToProcess(pageNum, _processorSettings.BatchSize);
})
.TakeWhile(resProfList => resProfList.Any())
.SelectMany(records => records.Where(x=> _determiner.ShouldProcess(x)))
.Select(resProf => Observable.Start(async () => await _schoolDataProcessor.ProcessSchoolsAsync(resProf)))
.Merge(maxConcurrent: _processorSettings.ParallelProperties)
.Do(async trackingRequests =>
{
await CreateRequests(trackingRequests.Result, createTrackingPayload);
var numberOfAttachments = SumOfRequestType(trackingRequests.Result, TrackingRecordRequestType.AttachSchool);
var numberOfDetachments = SumOfRequestType(trackingRequests.Result, TrackingRecordRequestType.DetachSchool);
var numberOfAssignmentTypeUpdates = SumOfRequestType(trackingRequests.Result,
TrackingRecordRequestType.UpdateAssignmentType);
_etlLogger.Info("Extractor generated {0} attachments, {1} detachments, and {2} assignment type changes.",
numberOfAttachments, numberOfDetachments, numberOfAssignmentTypeUpdates);
});
var subscription = query.Subscribe(
trackingRequests =>
{
//Nothing really needs to happen here. Technically we're just doing something when it's done.
},
() =>
{
_etlLogger.Info("Finished! Woohoo!");
});
await query.Wait();
最佳答案
这是因为您订阅了该序列两次。一次在 query.Subscribe(...)
处,再次在 query.Wait()
处。
Observable.Range(0, int.MaxValue)
是一个冷可观察值。每次订阅都会重新评估。您可以通过使用 Publish()
发布可观察对象,然后订阅它,然后 Connect()
,然后 Wait()
。如果您在最后一个元素已经生成后调用 Wait()
,这确实会增加获得 InvalidOperationException
的风险。更好的替代方案是 LastOrDefaultAsync()
。
这会给你带来这样的结果:
var connectable = query.Publish();
var subscription = connectable.Subscribe(...);
subscription = new CompositeDisposable(connectable.Connect(), subscription);
await connectable.LastOrDefaultAsync();
或者您可以避免等待并直接使用 ToTask()
返回任务(请从方法签名中删除异步)。
return connectable.LastOrDefaultAsync().ToTask();
一旦转换为任务,您可以使用 Wait()
同步等待它(不要将 Task.Wait()
与 Observable.Wait( )
)。
connectable.LastOrDefaultAsync().ToTask().Wait();
但是,您很可能根本不想等待!在异步上下文中等待没有什么意义。您应该做什么,将序列完成后需要运行的剩余代码放入订阅的 OnComplete()
部分中。如果您有即使在取消订阅 (Dispose) 时也需要运行的(清理)代码,请考虑 Observable.Using
或 Finally(...)
方法以确保此代码已运行。
关于c# - Observable.Range 被重复?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31518230/