postgresql - 使用 slick 的 3.0.0 流式结果和 Postgresql 的正确方法是什么?

标签 postgresql scala slick slick-3.0 akka-stream

我正在尝试弄清楚如何使用 slick streaming。我使用带有 postgres 驱动程序的 slick 3.0.0

情况如下:服务器必须给客户端数据序列拆分成受大小(以字节为单位)限制的 block 。所以,我写了以下巧妙的查询:

val sequences = TableQuery[Sequences]
def find(userId: Long, timestamp: Long) = sequences.filter(s ⇒ s.userId === userId && s.timestamp > timestamp).sortBy(_.timestamp.asc).result
val seq = db.stream(find(0L, 0L))

我将 seq 与 akka-streams Source 相结合,编写了自定义 PushPullStage,它限制了数据的大小(以字节为单位)并在达到大小限制时完成上游。它工作得很好。问题是 - 当我查看 postgres 日志时,我看到了这样的查询 从 user_id = 0 和 timestamp > 0 按时间戳排序的序列中选择 *;

因此,乍一看,似乎正在进行大量(且不必要的)数据库查询,但每次查询仅使用几个字节。使用 Slick 进行流式传输的正确方法是什么,以最大限度地减少数据库查询并充分利用每个查询中传输的数据?

最佳答案

使用 Slick 和 Postgres 进行流式传输的“正确方法”包括三件事:

  1. 必须使用 db.stream()

  2. 必须在 JDBC 驱动程序中禁用 autoCommit。一种方法是通过添加后缀 .transactionally 使查询在事务中运行。

  3. 必须将 fetchSize 设置为 0 以外的值,否则 postgres 将一次性将整个结果集推送到客户端。

例如:

DB.stream(
  find(0L, 0L)
    .transactionally
    .withStatementParameters(fetchSize = 1000)
).foreach(println)

有用的链接:

https://github.com/slick/slick/issues/1038

https://github.com/slick/slick/issues/809

关于postgresql - 使用 slick 的 3.0.0 流式结果和 Postgresql 的正确方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31340507/

相关文章:

sql - 在 Postgres 中将时间戳截断为 5 分钟的最快方法是什么?

python - 修复 Python 的 Pg 中的类型错误

sql - Postgres 整数类型的下限超出范围?

scala - SparkContext textFile的InputPath语法

scala - 这是scala专门的错误吗?

scala - Slick 2.1.0 中可选的嵌套映射实体

postgresql - flask 无法将 postgresql 与 docker-compose 连接

scala - 为什么用模式匹配收集不能缩小特定类别?

postgresql - 优化数据库操作 Slick 3

sql - Scala Slick 2.0 与 PostgreSQL。比较时间戳