java - 如何将通量链接到另一个通量/单声道并施加另一个背压?

标签 java reactive-programming project-reactor

我有以下在 react 器核心中使用通量的响应式(Reactive)代码:

Flux.create(sink -> ... /* listens to and receives from external source */ , FluxSink.OverflowStrategy.LATEST)
    .flatMap(map -> redisHashReactiveCommands.hmset(key, map))
    //.flatMap(... //want to store same data async into kafka with its own back pressure handling)
    .subscribeOn(Schedulers.parallel())
    .doOnNext(s -> log.debug("Redis consumed. Result -> {}", s))
    .doOnComplete(() -> log.debug("On completed."))
    .doOnError(exception -> log.error("Error occurred while consuming message", exception))
    .subscribe();

如您所见,我对流程的外部源进行了背压处理 (FluxSink.OverflowStrategy.LATEST)。但是,我还想为我的进程配置背压到 redis (redisHashReactiveCommands.hmset(key, map)),因为它可能是比我的进程的外部源更大的瓶颈。我预计我需要为 redis 部分创建另一个 flux 并将其与该 flux 链接,但我该如何实现这一点,因为 .flatMap 适用于单个项目而不是项目流?

此外,我也想将相同的发射项存储到 Kafka 中,但是链接 flapMap 似乎不起作用.. 有没有一种简单的方法可以在一组函数调用中将所有这些链接在一起(外部源 -> 我的进程,我的进程 -> redis,我的进程 -> kafka)?

最佳答案

如果您对主序列中的结果对象不感兴趣,您可以将flatMap 中的两个保存结合起来。您必须移动 subscribeOn 并登录 flatMap 以及将它们放在内部保存发布者上:

Flux.create(sink -> ... /* listens to and receives from external source */ , FluxSink.OverflowStrategy.LATEST)
    .flatMap(map -> Mono.when(
        redisHashReactiveCommands.hmset(key, map)
            .subscribeOn(Schedulers.parallel())
            .doOnNext(s -> log.debug("Redis consumed. Result -> {}", s)),

        kafkaReactiveCommand.something(map)
            .subscribeOn(Schedulers.parallel())
            .doOnNext(s -> log.debug("Kafka consumed. Result -> {}", s)),
    ))
    //... this results in a Mono<Void>
    .doOnComplete(() -> log.debug("Both redis and kafka completed."))
    .doOnError(exception -> log.error("Error occurred while consuming message", exception))
    .subscribe();

或者,如果您确定两个进程都会发出一个结果元素或一个错误,您可以通过替换 when< 将这两个结果合并到一个 Tuple2zip

关于java - 如何将通量链接到另一个通量/单声道并施加另一个背压?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53138423/

相关文章:

java - Reactor 有没有办法忽略错误信号?

静态修改器方法的 Java 命名约定

macos - Mac OS X 上多个 JDK 的含义

java - 使用Mysql从数据库中检索时间戳数据

java - 如何获得写在一行中的两个数字 - 是否有 Split() 方法?

javascript - 捕获所有按键输入,直到以响应式(Reactive)编程方式按下 ENTER

objective-c - 如何使用 ReactiveCocoa 简化我的嵌套 for 循环?

c# - 如何在长时间运行的查询中扩展 Throttle Timespan?

spring-webflux - 如何在不为空时重复单声道

project-reactor - 如何在spring reactor中将两个发布者合二为一