强调文本我正在尝试使用 Kotlin Flow 异步并行处理一些数据,并在响应发生时将响应流式传输到客户端,而不是等到所有作业完成。
尝试发送 flow
失败后本身到响应,如下所示:call.respond(HttpStatusCode.OK, flow.toList())
...我修补了几个小时试图弄清楚,并想出了以下内容。它是否正确?似乎应该有一种更惯用的发送 Flow<MyData>
的方式作为回应,就像 Flux<MyData>
一样在 Spring Boot 中。
另外,当 HTTP 请求被取消时,使用下面的方法似乎不会取消 Flow,那么如何在 Ktor 中取消它呢?
data class MyData(val number: Int)
class MyService {
fun updateAllJobs(): Flow<MyData> =
flow {
buildList { repeat(10) { add(MyData(Random.nextInt())) } }
// Docs recommend using `onEach` to "delay" elements.
// However, if I delay here instead of in `map`, all elements are held
// and emitted at once at the very end of the cumulative delay.
// .onEach { delay(500) }
.map {
// I want to emit elements in a "stream" as each is computed.
delay(500)
emit(it)
}
}
}
fun Route.jobRouter() {
val service: MyService by inject() // injected with Koin
put("/jobs") {
val flow = service.updateAllJobs()
// Just using the default Jackson mapper for this example.
val mapper = jsonMapper { }
// `respondOutputStream` seems to be the only way to send a Flow as a stream.
call.respondOutputStream(ContentType.Application.Json, HttpStatusCode.OK) {
flow.collect {
println(it)
// The data does not stream without the newline and `flush()` call.
write((mapper.writeValueAsString(it) + "\n").toByteArray())
flush()
}
}
}
}
最佳答案
我能找到的最佳解决方案(尽管我不喜欢它)是使用 respondBytesWriter
将数据写入响应正文 channel 。在处理程序中,启动一个收集流的新作业,以便在写入 channel 关闭时能够取消它(HTTP 请求被取消):
fun Route.jobRouter(service: MyService) {
put("/jobs") {
val flow = service.updateAllJobs()
val mapper = jsonMapper {}
call.respondBytesWriter(contentType = ContentType.Application.Json) {
val job = launch {
flow.collect {
println(it)
try {
writeStringUtf8(mapper.writeValueAsString(it))
flush()
} catch (_: ChannelWriteException) {
cancel()
}
}
}
job.join()
}
}
}
关于kotlin - 如何在 Ktor 流式响应中正确使用 Kotlin Flow?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73178416/