mongodb - CosmosDB - Mongodb IsUpsert 不适用于批量更新

标签 mongodb azure azure-cosmosdb

过去几个月,我们通过 .NET Core 和最新的 MongoDB.Driver Nuget 包 (2.11.0) 广泛使用了 MongoDB API 和 CosmosDB(服务器 v3.6)。

批量插入和插入工作正常,但不幸的是,我无法让批量操作与IsUpsert=true一起使用。模式。

Note:

  • We use Polly to manage rate limiting. As part of this, we handle MongoWriteException, MongoExecutionTimeoutException, MongoCommandException and MongoBulkWriteExceptions.
  • This issue can be observed for both sharded/non-sharded collections.

具体来说,给定一个非分片输入文档列表 List<T> documents ,以下工作正常:

  1. 批量插入:

    await Collection.BulkWriteAsync(documents.Select(s => new InsertOneModel<T>(s)),...)
    
  2. 批量更新:

    await Collection.BulkWriteAsync(documents.Select(s =>
      new ReplaceOneModel<T>(Builders<T>.Filter.Eq("Id", item.Id), item) { IsUpsert = false }),...)
    

不幸的是,如果某些文档是新文档,我们应该能够按原样使用上面的批量更新代码 - 只需将 IsUpsert 标志设置为 true...但是可惜,这不起作用。

具体来说,给定 50 个现有文档和 50 个新文档:

  • 如果文档的 ID 类型为 ObjectId作为主键,对于它处理的第一个新文档,CosmosDb 将错误地将其插入 Id=ObjectId("000000000000000000000000") - 届时将不会插入/更新更多文档。在这种情况下:
    • BulkWriteResult返回 MatchedCount=65, ModifiedCount=65, ProcessedRequests=100, RequestCount=100, Upserts=1, IsAcknowledged=true, IsModifiedCountAvailable=true, InsertedCount=0
    • 没有抛出异常。
    • 注意 - 数据库中只有 51 个文档,因此不能依赖 BulkWriteResult
  • 如果文档的 ID 类型为 int作为主键然后 cosmos db 似乎
    • 在某个随机点放弃处理文档。这看起来更像是一种速率限制类型的场景...除了不抛出异常
    • 例如,更新所有 50 个文档,但只插入 8 个文档。在本例中,BulkWriteResult返回 MatchedCount=50, ModifiedCount=50, ProcessedRequests=100, RequestCount=100, Upserts=8, IsAcknowledged=true, IsModifiedCountAvailable=true, InsertedCount=0 .

我错过了什么? ObjectId场景似乎完全被破坏了;另一种情况可以编码,但这里没有引发异常似乎不正确。

最佳答案

对于遇到此问题的其他人来说 - 解决方法远非直截了当,但这就是我最终所做的。

  • 主键为 ObjectId 的文档:只要您根据标识符的值为 CosmosDB 或不是。但是,您可能仍然需要处理下面提到的因一个错误而被关闭
  • 主键不是 MongoDB 的文档:绝对是 ReplaceOneModel<> 中的错误,因为我无法在官方 ObjectId.Empty 实现中重现此场景。为了解决这个问题,我必须应用以下两个解决方法:
    • 抛出自定义异常并更新我现有的 ObjectId 策略以重试未处理的请求,就像我通常处理通常由 CosmosDb 引发的其他 MongoDB 速率限制异常一样。示例代码:
    BulkWriteResult<T> bulkWriteResult = await Collection
     .BulkWriteAsync(
         remainingWork,
         new BulkWriteOptions { BypassDocumentValidation = true },
         token);
    
    var actuallyProcessed = bulkWriteResult.DeletedCount + bulkWriteResult.InsertedCount +
                         bulkWriteResult.ModifiedCount + bulkWriteResult.Upserts?.Count;
    if (actuallyProcessed < bulkWriteResult.ProcessedRequests.Count)
    {
        // Off by one error: OCCASIONALLY, the last one processed is not actually processed
        // No way to detect this, unfortunately - hence the adjustment by 1
        actuallyProcessed = actuallyProcessed > 1 ? actuallyProcessed - 1 : 0;
        var processed = bulkWriteResult.ProcessedRequests.Take((int)actuallyProcessed)
            .ToList().AsReadOnly();
        var unprocessed = bulkWriteResult.ProcessedRequests.Skip((int)actuallyProcessed)
            .ToList().AsReadOnly();
        throw new CosmosDbRateLimitingBugException<T>(unprocessed, processed, bulkWriteResult);
    }
    
  • 差一错误处理。不确定纯 Polly 实现中是否需要这样做,但就像上面一样,有时您还必须将已处理的记录调整 1。注意:无论使用“IsUpsert=true”,此问题都适用。下面的代码稍微简化了,因为我使用 MongoDB 来跟踪异常和已处理/未处理的记录(未显示)。这里 CosmosDB 是必须向下一个 MongoDB 调用发出的 Polly.Context 请求。
if (exception is MongoBulkWriteException<T> mostRecentException)
{
    var unProcessedRequests =
        mostRecentException.UnprocessedRequests.ToList();
    if (mostRecentException.WriteErrors.Any())
    {
        //get processed requests (without success) that failed and add to remainingWork
        var requestWithError = new[]
            {
                mostRecentException.Result.ProcessedRequests[
                    mostRecentException.WriteErrors[0].Index]
            };
        unProcessedRequests = unProcessedRequests.Concat(requestWithError).ToList();
    }

    remainingWork = unProcessedRequests.ToList();
}
else if (exception is CosmosDbRateLimitingBugException<T> cosmosDbBug)
{
    remainingWork = cosmosDbBug.UnprocessedRequests;
}

关于mongodb - CosmosDB - Mongodb IsUpsert 不适用于批量更新,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63351358/

相关文章:

mongodb - 使用 mongodb/mongo-go-driver 和 Azure CosmosDB 的 ModifiedCount 和 MatchedCount 不正确

java - 如何使用JAVA从Azure Cosmos DB集合之一获取所有文档?

c# - 如何根据内部 json 对象值在 DocumentDB 中查询?

javascript - Stripe/node.js : how retrieve stripe subscription safely + increment 1

node.js - Mongoose 无法正确排序数字

c# - 如何制作整数 ID 生成器?

node.js - 如何严格指定返回类型 Angular4/ionic3

c# - Azure 中与 MongoDB 的离线数据同步

azure - 是否可以将基于 .exe 的简单计算器工具移动到云以供多用户使用?

mongodb - Terraform 选择在 Azure 中使用免费层 cosmosDB 帐户