kotlin - 如何在 Ktor 流式响应中正确使用 Kotlin Flow?

标签 kotlin kotlin-coroutines ktor kotlin-flow

强调文本我正在尝试使用 Kotlin Flow 异步并行处理一些数据,并在响应发生时将响应流式传输到客户端,而不是等到所有作业完成。

尝试发送 flow 失败后本身到响应,如下所示:call.respond(HttpStatusCode.OK, flow.toList())

...我修补了几个小时试图弄清楚,并想出了以下内容。它是否正确?似乎应该有一种更惯用的发送 Flow<MyData> 的方式作为回应,就像 Flux<MyData> 一样在 Spring Boot 中。

另外,当 HTTP 请求被取消时,使用下面的方法似乎不会取消 Flow,那么如何在 Ktor 中取消它呢?

data class MyData(val number: Int)

class MyService {
    fun updateAllJobs(): Flow<MyData> =
        flow {
            buildList { repeat(10) { add(MyData(Random.nextInt())) } }
                // Docs recommend using `onEach` to "delay" elements.
                // However, if I delay here instead of in `map`, all elements are held
                // and emitted at once at the very end of the cumulative delay.
                // .onEach { delay(500) }
                .map {
                    // I want to emit elements in a "stream" as each is computed.
                    delay(500)
                    emit(it)
                }
        }
}

fun Route.jobRouter() {
    val service: MyService by inject() // injected with Koin

    put("/jobs") {
        val flow = service.updateAllJobs()
        // Just using the default Jackson mapper for this example.
        val mapper = jsonMapper { }

        // `respondOutputStream` seems to be the only way to send a Flow as a stream.
        call.respondOutputStream(ContentType.Application.Json, HttpStatusCode.OK) {
            flow.collect {
                println(it)
                // The data does not stream without the newline and `flush()` call.
                write((mapper.writeValueAsString(it) + "\n").toByteArray())
                flush()
            }
        }
    }
}

最佳答案

我能找到的最佳解决方案(尽管我不喜欢它)是使用 respondBytesWriter 将数据写入响应正文 channel 。在处理程序中,启动一个收集流的新作业,以便在写入 channel 关闭时能够取消它(HTTP 请求被取消):

fun Route.jobRouter(service: MyService) {
    put("/jobs") {
        val flow = service.updateAllJobs()
        val mapper = jsonMapper {}

        call.respondBytesWriter(contentType = ContentType.Application.Json) {
            val job = launch {
                flow.collect {
                    println(it)
                    try {
                        writeStringUtf8(mapper.writeValueAsString(it))
                        flush()
                    } catch (_: ChannelWriteException) {
                        cancel()
                    }
                }
            }

            job.join()
        }
    }
}

关于kotlin - 如何在 Ktor 流式响应中正确使用 Kotlin Flow?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73178416/

相关文章:

android - 我想为某些滚动空间触发 OnScrollChangeListener

java - Android检查文件错误 “Type mismatch: inferred type is Context but Path! was expected”

android - 协程中的热流和冷流是什么,它们之间的区别是什么?

android - 在我的 KtorClient 的 DefaultRequest 中声明 ContentType = Application.Json 后,我可以更改特定请求的 ContentType header 吗

android - 当我在自定义 View 类中使用 R.styleable 时,我得到一个红色的 Unresolved reference : styleable

android - Moshi将嵌套的JSON值映射到字段

Kotlin 箭头组合经过验证的列表

android - 在 Retrofit 客户端中访问 DataStore 首选项 [HILT]

ktor - 如何以编程方式启动 ktor-server 以进行集成测试

kotlin - Ktor 在负载下延迟请求