我正在尝试弄清楚如何使用 slick streaming。我使用带有 postgres 驱动程序的 slick 3.0.0
情况如下:服务器必须给客户端数据序列拆分成受大小(以字节为单位)限制的 block 。所以,我写了以下巧妙的查询:
val sequences = TableQuery[Sequences]
def find(userId: Long, timestamp: Long) = sequences.filter(s ⇒ s.userId === userId && s.timestamp > timestamp).sortBy(_.timestamp.asc).result
val seq = db.stream(find(0L, 0L))
我将 seq 与 akka-streams Source
相结合,编写了自定义 PushPullStage
,它限制了数据的大小(以字节为单位)并在达到大小限制时完成上游。它工作得很好。问题是 - 当我查看 postgres 日志时,我看到了这样的查询
从 user_id = 0 和 timestamp > 0 按时间戳排序的序列中选择 *;
因此,乍一看,似乎正在进行大量(且不必要的)数据库查询,但每次查询仅使用几个字节。使用 Slick 进行流式传输的正确方法是什么,以最大限度地减少数据库查询并充分利用每个查询中传输的数据?
最佳答案
使用 Slick 和 Postgres 进行流式传输的“正确方法”包括三件事:
必须使用 db.stream()
必须在 JDBC 驱动程序中禁用
autoCommit
。一种方法是通过添加后缀.transactionally
使查询在事务中运行。必须将
fetchSize
设置为 0 以外的值,否则 postgres 将一次性将整个结果集推送到客户端。
例如:
DB.stream(
find(0L, 0L)
.transactionally
.withStatementParameters(fetchSize = 1000)
).foreach(println)
有用的链接:
关于postgresql - 使用 slick 的 3.0.0 流式结果和 Postgresql 的正确方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31340507/