MongoDB react 模板事务

标签 mongodb spring-boot kotlin transactions kotlinx.coroutines

我在我的开源项目中使用 mongodb 已经一年多了,最近我决定尝试一下事务处理。在为使用事务的方法编写了一些测试后,我发现它们会抛出一些奇怪的异常,但我无法弄清楚问题出在哪里。所以我有一个使用自定义 coroutine contextmutexmethod delete:

  open suspend fun delete(photoInfo: PhotoInfo): Boolean {
    return withContext(coroutineContext) {
      return@withContext mutex.withLock {
        return@withLock deletePhotoInternalInTransaction(photoInfo)
      }
    }
  }

然后调用 method 执行一些删除操作:

  //FIXME: doesn't work in tests
  //should be called from within locked mutex
  private suspend fun deletePhotoInternalInTransaction(photoInfo: PhotoInfo): Boolean {
    check(!photoInfo.isEmpty())

    val transactionMono = template.inTransaction().execute { txTemplate ->
      return@execute photoInfoDao.deleteById(photoInfo.photoId, txTemplate)
        .flatMap { favouritedPhotoDao.deleteFavouriteByPhotoName(photoInfo.photoName, txTemplate) }
        .flatMap { reportedPhotoDao.deleteReportByPhotoName(photoInfo.photoName, txTemplate) }
        .flatMap { locationMapDao.deleteById(photoInfo.photoId, txTemplate) }
        .flatMap { galleryPhotoDao.deleteByPhotoName(photoInfo.photoName, txTemplate) }
    }.next()

    return try {
      transactionMono.awaitFirst()
      true
    } catch (error: Throwable) {
      logger.error("Could not delete photo", error)
      false
    }
  }

这里我有五个操作,从五个不同的文档中删除数据。以下是其中一项操作的示例:

open fun deleteById(photoId: Long, template: ReactiveMongoOperations = reactiveTemplate): Mono<Boolean> {
    val query = Query()
      .addCriteria(Criteria.where(PhotoInfo.Mongo.Field.PHOTO_ID).`is`(photoId))

    return template.remove(query, PhotoInfo::class.java)
      .map { deletionResult -> deletionResult.wasAcknowledged() }
      .doOnError { error -> logger.error("DB error", error) }
      .onErrorReturn(false)
  }

如果任一删除操作失败,我希望此操作失败,所以我使用事务。

然后我有一些 tests 用于使用此 delete 方法的处理程序:

  @Test
  fun `photo should not be uploaded if could not enqueue static map downloading request`() {
    val webClient = getWebTestClient()
    val userId = "1234235236"
    val token = "fwerwe"

    runBlocking {
      Mockito.`when`(remoteAddressExtractorService.extractRemoteAddress(any())).thenReturn(ipAddress)
      Mockito.`when`(banListRepository.isBanned(Mockito.anyString())).thenReturn(false)
      Mockito.`when`(userInfoRepository.accountExists(userId)).thenReturn(true)
      Mockito.`when`(userInfoRepository.getFirebaseToken(Mockito.anyString())).thenReturn(token)
      Mockito.`when`(staticMapDownloaderService.enqueue(Mockito.anyLong())).thenReturn(false)
    }

    kotlin.run {
      val packet = UploadPhotoPacket(33.4, 55.2, userId, true)
      val multipartData = createTestMultipartFile(PHOTO1, packet)

      val content = webClient
        .post()
        .uri("/v1/api/upload")
        .contentType(MediaType.MULTIPART_FORM_DATA)
        .body(BodyInserters.fromMultipartData(multipartData))
        .exchange()
        .expectStatus().is5xxServerError
        .expectBody()

      val response = fromBodyContent<UploadPhotoResponse>(content)
      assertEquals(ErrorCode.DatabaseError.value, response.errorCode)

      assertEquals(0, findAllFiles().size)

      runBlocking {
        assertEquals(0, galleryPhotoDao.testFindAll().awaitFirst().size)
        assertEquals(0, photoInfoDao.testFindAll().awaitFirst().size)
      }
    }
  }

  @Test
  fun `photo should not be uploaded when resizeAndSavePhotos throws an exception`() {
    val webClient = getWebTestClient()
    val userId = "1234235236"
    val token = "fwerwe"

    runBlocking {
      Mockito.`when`(remoteAddressExtractorService.extractRemoteAddress(any())).thenReturn(ipAddress)
      Mockito.`when`(banListRepository.isBanned(Mockito.anyString())).thenReturn(false)
      Mockito.`when`(userInfoRepository.accountExists(userId)).thenReturn(true)
      Mockito.`when`(userInfoRepository.getFirebaseToken(Mockito.anyString())).thenReturn(token)
      Mockito.`when`(staticMapDownloaderService.enqueue(Mockito.anyLong())).thenReturn(true)

      Mockito.doThrow(IOException("BAM"))
        .`when`(diskManipulationService).resizeAndSavePhotos(any(), any())
    }

    kotlin.run {
      val packet = UploadPhotoPacket(33.4, 55.2, userId, true)
      val multipartData = createTestMultipartFile(PHOTO1, packet)

      val content = webClient
        .post()
        .uri("/v1/api/upload")
        .contentType(MediaType.MULTIPART_FORM_DATA)
        .body(BodyInserters.fromMultipartData(multipartData))
        .exchange()
        .expectStatus().is5xxServerError
        .expectBody()

      val response = fromBodyContent<UploadPhotoResponse>(content)
      assertEquals(ErrorCode.ServerResizeError.value, response.errorCode)

      assertEquals(0, findAllFiles().size)

      runBlocking {
        assertEquals(0, galleryPhotoDao.testFindAll().awaitFirst().size)
        assertEquals(0, photoInfoDao.testFindAll().awaitFirst().size)
      }
    }
  }

  @Test
  fun `photo should not be uploaded when copyDataBuffersToFile throws an exception`() {
    val webClient = getWebTestClient()
    val userId = "1234235236"
    val token = "fwerwe"

    runBlocking {
      Mockito.`when`(remoteAddressExtractorService.extractRemoteAddress(any())).thenReturn(ipAddress)
      Mockito.`when`(banListRepository.isBanned(Mockito.anyString())).thenReturn(false)
      Mockito.`when`(userInfoRepository.accountExists(userId)).thenReturn(true)
      Mockito.`when`(userInfoRepository.getFirebaseToken(Mockito.anyString())).thenReturn(token)
      Mockito.`when`(staticMapDownloaderService.enqueue(Mockito.anyLong())).thenReturn(true)

      Mockito.doThrow(IOException("BAM"))
        .`when`(diskManipulationService).copyDataBuffersToFile(Mockito.anyList(), any())
    }

    kotlin.run {
      val packet = UploadPhotoPacket(33.4, 55.2, userId, true)
      val multipartData = createTestMultipartFile(PHOTO1, packet)

      val content = webClient
        .post()
        .uri("/v1/api/upload")
        .contentType(MediaType.MULTIPART_FORM_DATA)
        .body(BodyInserters.fromMultipartData(multipartData))
        .exchange()
        .expectStatus().is5xxServerError
        .expectBody()

      val response = fromBodyContent<UploadPhotoResponse>(content)
      assertEquals(ErrorCode.ServerDiskError.value, response.errorCode)

      assertEquals(0, findAllFiles().size)

      runBlocking {
        assertEquals(0, galleryPhotoDao.testFindAll().awaitFirst().size)
        assertEquals(0, photoInfoDao.testFindAll().awaitFirst().size)
      }
    }
  }

通常第一个测试通过:

enter image description here

以下两个失败,但出现以下异常:

17:09:01.228 [Thread-17] ERROR com.kirakishou.photoexchange.database.dao.PhotoInfoDao - DB error
org.springframework.data.mongodb.UncategorizedMongoDbException: Command failed with error 24 (LockTimeout): 'Unable to acquire lock '{8368122972467948263: Database, 1450593944826866407}' within a max lock request timeout of '5ms' milliseconds.' on server 192.168.99.100:27017. 

然后:

Caused by: com.mongodb.MongoCommandException: Command failed with error 246 (SnapshotUnavailable): 'Unable to read from a snapshot due to pending collection catalog changes; please retry the operation. Snapshot timestamp is Timestamp(1545661357, 23). Collection minimum is Timestamp(1545661357, 24)' on server 192.168.99.100:27017.

和:

17:22:36.951 [Thread-16] WARN  reactor.core.publisher.FluxUsingWhen - Async resource cleanup failed after cancel
com.mongodb.MongoCommandException: Command failed with error 251 (NoSuchTransaction): 'Transaction 1 has been aborted.' on server 192.168.99.100:27017. 

有时其中两个通过而最后一个失败。

enter image description here

看起来只有第一个事务成功,任何后续事务都会失败,我想原因是我必须手动关闭它(或 ClientSession)。但我找不到有关如何关闭交易/ session 的任何信息。 Here 是我能找到的为数不多的几个例子之一,他们在这些例子中使用了带有反应模板的交易,而且我没有看到他们做任何额外的事情来关闭交易/ session 。

或者可能是因为我在模拟一个在事务中抛出异常的方法?也许在这种情况下它没有被关闭?

最佳答案

客户端 session /事务已正确关闭,但在测试中创建的索引似乎正在获取全局锁,导致下一个事务锁落后并等待锁请求超时。

基本上,您必须管理您的索引创建,这样它们就不会干扰来自客户​​的交易。

一个快速修复方法是通过在 shell 中运行以下命令来增加锁定超时。

db.adminCommand( { setParameter: 1, maxTransactionLockRequestTimeoutMillis: 50 } )

在生产中你可以查看事务错误标签 并重试该操作。

更多信息 https://docs.mongodb.com/manual/core/transactions-production-consideration/#pending-ddl-operations-and-transactions

关于MongoDB react 模板事务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53914707/

相关文章:

node.js - Mongoose 模式更新 GeoJSON

android - Kotlin-多个动画同时出现和/或互相重复

mysql - 如何在不依赖数据库的情况下启动 spring-boot 应用程序?

java - 当您在休息调用中公开接口(interface)RequestBody时,如何将json反序列化为java对象?

rest - Spring boot RestController 不能与实现接口(interface)的类一起使用

java - 我无法向 android studio 中的 color.xml 文件添加颜色

java - 什么时候不使用 RxJava 的 Observable?

由于 WiredTiger.turtle 权限,Mongodb 无法启动

mongodb - $maxDistance 在 MongoDB 中是如何测量的?

mongodb - 你如何告诉 Mongo 在限制结果之前对集合进行排序?