java - 使用具有动态参数的 Mono 方法

标签 java reactive-programming project-reactor

我有一个函数将输入作为 Mono 返回:

public static Mono<Integer> emitter(int param){
    return Mono.just(param)
            .delayElement(Duration.ofMillis(100)); //delay to simulate http response
}

我想以初始值 3 调用发射器一次,然后重复执行直到达到特定大小。这个重复逻辑应该在 main 方法中,这样我就无法修改 emitter()

public static void main(String[] args){
    int maxSize = 5;
    int initial = 3;
    Mono<Integer> response = emitter(initial);

    response
        .doOnNext(s -> {
            System.out.println("need more!");
        })
        .subscribe();
}

一个简单的解决方案是:

public static void main(String[] args){
    int maxSize = 5;
    int initial = 3;

    for(int i = 0; i < 999; i++) {
        Mono<Integer> response = emitter(initial+i);
        Mono<Boolean> isDone = response
                .flatMap(elem -> {
                    if(elem < maxSize) {
                        System.out.println("need more!");
                        return Mono.just(false);
                    } else {
                        System.out.println("ok done!");
                        return Mono.just(true);
                    }
                });
        if(isDone.block())
            break;
    }
}

基本上,我正在尝试根据前一个 Mono 的结果创建另一个具有动态参数的 Mono。我知道 Mono/Flux 是不可变的...... 有没有一种简洁且响应式(Reactive)的方法来做到这一点? 我尝试过诸如 Flux.range(0, Integer.MAX_VALUE).zipWith(myMono) 之类的方法来尝试将参数输入发射器,但无法使其工作。

PS。我知道我的例子没有多大意义。我试图简化涉及列表和 Spring WebFlux(发射器)的现实世界场景。

谢谢!

---编辑

好的,这就是我的想法:

public static void main(String[] args) throws InterruptedException {
    int maxSize = 5;
    int initial = 3;

    Flux.range(initial, 10)
            .delayElements(Duration.ofSeconds(1))  
            .flatMap(param -> emitter(param))
            .flatMap(it -> {
                if(it < maxSize) {
                    System.out.println("need more!: " + it);
                    return Mono.just(false);
                } else {
                    System.out.println("done!: " + it);
                    return Mono.just(true);
                }
             })
            .takeUntil(Boolean::booleanValue)
            .subscribe();

    Thread.sleep(6000);
}
need more!: 3
need more!: 4
done!: 5

一个问题是,如果我不延迟 Flux.range,则执行不会按顺序完成,并且输出的打印语句可能比预期的 3 行多或少。

最佳答案

您可以使用 Publisher 的 expand 函数,其作用类似于递归 例如

emitter(initial)
    .expand(i -> i < maxSize ? emitter(i + 1) : Mono.empty())
    .doOnNext(i -> System.out.println("i = " + i))
    .subscribe();

关于java - 使用具有动态参数的 Mono 方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52154781/

相关文章:

java - 我试图同时掷三个骰子,直到它们落在数字 6 上

java - 为什么 IntelliJ 在我进行重构时会删除我的文件?所以我失去了所有的工作

java - 将 io.projectreactor 版本从 2.0.x 升级到 3.0.4 - 使用 Spring 框架

java - Files.createDirectory() 真的是非阻塞的吗?

java - Android editText 字符串返回 false 而不是 ""java

java - Do while 循环比较字符串

c# - 在 RX.net 和 WPF 中长时间运行 API 调用的正确方法

java - RxJava 2.0 - 如何将 Observable 转换为 Publisher

java - RxJava 流 : conditional operators and error handling

spring - 如何使用 Spring 5 WebClient 等待所有请求完成?