java - 如何有选择地跳过 Flux 上的多个处理步骤

标签 java option-type flux project-reactor

我有从套接字接收的动态热数据流。 我需要检查条件,如果值匹配,则跳转到步骤3并显示新消息。

    final Flux<Msg> msgs = Flux.generate(receiver);

    final Flux<Msg> processed = msgs
        .map(this::checkCondition)  //step1
        .map(remote::doLongRunning) //optional step2
        .map(this::processFurther)  //step3
 ...

    public Msg checkCondition(Msg msg) {
        if(doCheck(msg)){
            //is there a way to jump to step3 here ?
            return new OtherMsg(msg, "someAdditionalData")) 
        } else{
            return msg
        }
    }

我能想到的唯一替代方案 - 是将 Flux 分开并将其组装回来,有没有更干净的方法?

   final Flux<Msg> msgs = Flux.generate(receiver);

        final Flux<OtherMsg> checked = msgs
            .filter(this::doCheck) //step1
            .map(msg -> new OtherMsg(msg, "someAdditionalData"));

        final Flux<OtherMsg> unchecked = msgs
            .filter(msg -> !doCheck(msg)) //step1
            .map(remote::doLongRunning);  //optional step2

        Flux.merge(checked, unchecked)
            .map(this::processFurther)  //step3


最佳答案

您无法跳过某个步骤,但可以将 flatMap() 与三元运算符结合使用来实现条件分支的形式:

final Flux<Msg> processed = msgs
        .flatMap(msg -> doCheck(msg)
            ? Mono.just(new OtherMsg(msg, "someAdditionalData")).map(remote::doLongRunning)
            : Mono.just(msg))
        .map(this::processFurther);

这样您就可以调用任何其他方法来操作三元表达式第一部分中的值,并且如果 doCheck() 返回 false,第二部分将确保它被绕过。 processFurther() 将在 flatMap() 调用之后执行,因此无论如何都会执行。

关于java - 如何有选择地跳过 Flux 上的多个处理步骤,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57484203/

相关文章:

java - 如何使用 daimajia/AndroidImageSlider 库在图像滑动时更改 TextView 的文本?

java - java.lang.NoClassDefFoundError org.apache.hadoop.hbase.mapreduce.ImportTsv

java - Spring Boot @ConfigurationProperties 不从环境中检索属性

java - Android PopupMenu 在recycleview 中不起作用?

java - Java 8 中的可选链接

javascript - React-router多个组件在auth情况下onEnter?

java - 通量。有没有办法重试最后一个元素?

java - Grails 服务无法使用 Optional 类

Swift:如何将 Optional ("Optional(str)") 转换为 Int

reactjs - 可以从shouldComponentUpdate 中调用setState 吗?