我按照此示例代码 ( https://github.com/Azure/azure-documentdb-changefeedprocessor-dotnet#example ) 注册一个观察者来处理 cosmos db 集合中的更改源。 我正在使用实用程序在 cosmos db 集合中创建新文档(例如在 for 循环中创建 400 个文档)。 我正在使用 30 秒的 FeedPollDelay。但 CFP lib 似乎并没有尊重它。即使在 Feed 轮询延迟间隔到期之前,也会重复调用 ProcessChangesAsync 方法。 第一批检索了约 60 个文档,第二批检索了约 20 个文档,第三批检索了约 100 个文档。
DocumentCollectionInfo feedCollectionInfo = new DocumentCollectionInfo()
{
DatabaseName = databaseName,
CollectionName = monitoredCollectionName,
Uri = new Uri(uri),
MasterKey = masterKey
};
DocumentCollectionInfo leaseCollectionInfo = new DocumentCollectionInfo()
{
DatabaseName = databaseName,
CollectionName = leaseCollectionName,
Uri = new Uri(uri),
MasterKey = masterKey
};
ChangeFeedProcessorOptions feedProcessorOptions = new ChangeFeedProcessorOptions()
{
FeedPollDelay = TimeSpan.FromSeconds(30)
//LeasePrefix = Guid.NewGuid().ToString(),
//MaxItemCount = 100
};
ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorBuilder();
processor = await builder
.WithHostName(hostName)
.WithFeedCollection(feedCollectionInfo)
.WithLeaseCollection(leaseCollectionInfo)
.WithProcessorOptions(feedProcessorOptions)
.WithObserver<LiveWorkItemChangeFeedObserver>()
.BuildAsync();
await processor.StartAsync();
第一批收到 60 份文档就可以了。但我预计在 Feed 轮询延迟(30 秒)间隔到期后,将在单个批处理中调用第二批,并包含其余 340 个文档。
但是 ProcessChangesAsync 方法会频繁触发,并且此选项未得到遵守。
最佳答案
当更改源处理器读取更改源并且没有发现新更改时(而不是在每个批处理之间),使用 FeedPollDelay。
流程示例:
- CFP 轮询更改,发现 X。
- 使用 X 调用 ProcessChangesAsync
- ProcessChangesAsync 完成后,CFP 立即轮询更改,找到 Y。
- 使用 Y 调用 ProcessChangesAsync。
- ProcessChangesAsync 完成后,CFP 立即轮询更改,但什么也没发现,然后等待 FeedPollDelay。
- CFP 轮询变化,发现 Z。
- 使用 Z 调用 ProcessChangesAsync
- ProcessChangesAsync 完成后,CFP 立即轮询更改,但什么也没发现,然后等待 FeedPollDelay。
- 等等……
关于azure - 更改源处理器库不支持 ChangeFeedProcessorOptions FeedPollDelay/CheckPointFrequency,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55956916/