reactive-programming - 响应式(Reactive)管道中的后台任务(即发即忘)

标签 reactive-programming spring-webflux project-reactor

我有一个响应式(Reactive)管道来处理传入的请求。对于每个请求,我需要调用一个与业务相关的函数 ( doSomeRelevantProcessing )。
完成后,我需要将发生的事情通知一些外部服务。管道的那部分应该不增加整体响应时间 .
此外,通知这个外部系统对业务来说并不重要:在管道的主要部分完成后做出快速响应比确保通知成功更重要。
据我所知,在不减慢整个过程的情况下在后台运行某些东西的唯一方法是直接在管道中订阅,从而实现即发即忘的心态。
flatmap 内订阅是否有更好的选择? ?
我有点担心如果通知外部服务的时间比原始处理时间长,并且同时有大量请求传入,会发生什么情况。这会导致内存耗尽或整个进程阻塞吗?

fun runPipeline(incoming: Mono<Request>) = incoming
    .flatMap { doSomeRelevantProcessing(it) } // this should not be delayed
    .flatMap { doBackgroundJob(it) } // this can take a moment, but is not super critical

fun doSomeRelevantProcessing(request: Request) = Mono.just(request) // do some processing

fun doBackgroundJob(request: Request) = Mono.deferContextual { ctx: ContextView ->
    val notification = "notification" // build an object from context

    // this uses non-blocking HTTP (i.e. webclient), so it can take a second or so 
    notifyExternalService(notification).subscribeOn(Schedulers.boundedElastic()).subscribe()

    Mono.just(Unit)
}

fun notifyExternalService(notification: String) = Mono.just(Unit) // might take a while

最佳答案

我回答这个假设您使用纯粹的 react 机制通知外部服务 - 即您没有包装阻塞服务。如果您是,那么答案会有所不同,因为您受到有界弹性线程池大小的限制,如果您每秒收到数百个请求,该线程池可能很快就会不堪重负。
(假设您使用的是 react 机制,那么您在示例中给出的 .subscribeOn(Schedulers.boundedElastic()) 就没有必要了,因为这不会给您带来任何好处——它专为包装遗留阻塞服务而设计。)

Could this lead to a memory exhaustion


只有在非常极端的情况下才有可能,每个单独请求使用的内存将很小。几乎肯定不值得担心,如果您在这里开始看到内存问题,那么您几乎肯定会在其他地方遇到其他问题。
话虽如此,我可能会建议添加 .timeout(Duration.ofSeconds(5))或类似之前您的内部订阅方法,以确保请求在一段时间后被终止,如果它们因任何原因没有工作 - 这将防止它们堆积。

...or [can this cause] the overall process to block?


这个更容易 - 简短的不,它不能。

关于reactive-programming - 响应式(Reactive)管道中的后台任务(即发即忘),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69358013/

相关文章:

haskell - react 性香蕉中的 'Latch' 和 'Pulse' 是什么?

kotlin - 为什么 Sinks.many().multicast().onBackpressureBuffer() 在订阅者之一取消订阅后完成以及如何避免它

java - 在应用程序运行期间动态地将事件/值推送到 Flux

python-3.x - 在python3中合并异步迭代

java - 如何在Rxjava中按键合并两个Observable?

spring - 如何在 Postman 中查看 Spring 5 Reactive API 的响应?

java - 返回 Mono<Void> 后的方法调用

java - 我可以将 Spring WebFlux 应用程序部署为 WAR

spring - 如何使 AuditorAware 与 Spring Data Mongo Reactive 一起工作

r - 在适用于 R 的 Shiny 应用程序中,如何延迟响应式的触发?