java - Spring Data 与 JPA 2.2 resultStream 到 Kotlin 的 Flow

标签 java jpa kotlin spring-data-jpa kotlin-coroutines

在 JPA 2.2 之前,如果我想将 ScrollableResults 发送到 Kotlin 的 Flow ,我必须这样做:

  override fun findSomeUsers(batch: Int): Flow<User> {
    return flow {
      (em.delegate as Session).sessionFactory.openSession().use { session ->
        val query = session.createQuery("select u from User u where ...")
        query.fetchSize = batch
        query.isReadOnly = true

        query.scroll(ScrollMode.FORWARD_ONLY).use { results ->
          while (results.next()) {
            val u = results.get(0) as User
            emit(u)
          }
        }
      }
    }
  }

我必须将 EntityManager 向下转换为 Hibernate 的 Session

但是由于 JPA 2.2 的 Query 支持 getResultStream ,因此应该有一种更简洁的方法来实现此目的:

  @ExperimentalCoroutinesApi
  override fun findSomeUsers(batchSize: Int): Flow<User> {
    return channelFlow {
      em.createQuery("select u from User u where ...")
        .setHint(HINT_FETCH_SIZE, batchSize) // "org.hibernate.fetchSize"
        .unwrap(javax.persistence.Query::class.java)
        .resultStream
        .asSequence()
        .map { it as User }
        .forEach { u ->
          runBlocking {
            send(u)
          }
        }
    }
  }

嗯,效果很好,但有些可疑。

首先,为什么我不能只编写 resultStream.asSequence.map {it as User}.asFlow() ? (客户端无任何反应结束)

其次,runBlocking block 很难看。 runBlocking 只能在测试中使用。但我在代码中没有找到绕过它的方法。

有什么办法可以解决吗?

第三,与问题无关。看来Spring-Data-JPA仍然不支持这样的查询方法:

  @Query("select u from User u where ...") 
  @MaybeSomeQueryHint(batchSize=:batchSize)
  fun findSomeUsers(@Param("name="batchSize") batchSize: Int): Flow<User>

它加载所有用户,然后提示重复的行...

客户端(测试)端代码就像这样简单:

  @ExperimentalCoroutinesApi
  @Test
  @Transactional
  open fun testUsers() {
    runBlocking {
      userDao.findSomeUsers(100).collectIndexed { index, u: User ->
        logger.info("[{}] {}", index , u)
      }
    }
  }

致@Marko,Stream 版本运行良好:

  override fun findSomeUserStream(batchSize: Int): Stream<User> {
    return em.createQuery("select u from User u where ...")
      .setHint(HINT_FETCH_SIZE, batchSize) // "org.hibernate.fetchSize"
      .unwrap(javax.persistence.Query::class.java)
      .resultStream
      .map { it as User }
  }


  @Transactional // without this annotation , "Operation not allowed after ResultSet closed" will be thrown
  @Test
  open fun testUserStream() {
    runBlocking {
      userDao.findSomeUserStream(100).forEach { u ->
        logger.info("{}" , u)
      }
    }
  }


  // it works !!
  @Transactional
  @Test
  open fun testUserStream2() {
    runBlocking {
      userDao.findSomeUserStream(100).asSequence().asFlow().collect { u ->
        logger.info("{}" , u)
      }
    }
  }

最佳答案

不要修补 Stream.toSequence() 的结果,而是定义 StreamFlow 的转换:

fun <T> Stream<T>.asFlow() = flow {
    for (t in iterator()) {
        emit(t)
    }
}

如果您将其与此代码示例一起使用:

suspend fun main() {
    Stream.of("a", "b")
            .asFlow()
            .collect { println(it) }
}

它将打印

a
b

你的函数应该是这样的:

override fun findSomeUsers(batchSize: Int): Flow<User> {
    return em.createQuery("select u from User u where ...")
            .setHint(HINT_FETCH_SIZE, batchSize) // "org.hibernate.fetchSize"
            .unwrap(javax.persistence.Query::class.java)
            .resultStream
            .asFlow()
            .map { it as User }
}

关于java - Spring Data 与 JPA 2.2 resultStream 到 Kotlin 的 Flow,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58700235/

相关文章:

kotlin - RxJava Zip 可观察可迭代

java - 索引越界。应如何将 resultSet 中的数据存储到 ArrayList 中?

java - 在同一台机器上调试多个Socket IP地址

java - 使用数组创建字符串

java - 如何从 CollectionTable 中删除数据 | hibernate (日本)

reflection - Kotlin 获取字段注释始终为空

java - 如何进行Spring JDBC连接并检索数据?

java - 现实世界中的 ORM

java - QueryDSL:QueryDSL 2.9.0 中 ConstructorExpression 和 CaseBuilder 的 JPA 示例

android - java.lang.ClassNotFoundException : com. intellij.junit5.JUnit5IdeaTestRunner 在 kotlin 中使用 spek