java - 如何在Reactor中进行分页?

标签 java spring spring-webflux project-reactor spring-webclient

我向第三方 Web 服务重复发出分页 WebClient 请求。我现在的实现可以工作,但正在阻塞。

到目前为止我的实现:

var elementsPerPage = 10;
Flux
    .generate(
        () -> 0,
        (pageIndex, emitter) -> {
            BlahServiceResponse blahServiceResponse =
                webClient
                    .get()
                    .uri("/blah?pageIndex={pageIndex}", pageIndex)
                    .retrieve()
                    .bodyToMono(BlahServiceResponse.class)
                    .block(); // Yuck!!!
            if (blahServiceResponse.getStudents().size() > 0) {
                emitter.next(blahServiceResponse);
            } else {
                emitter.complete();
            }
            return pageIndex + elementsPerPage;
        }
    )
    .subscribe(System.out::println); // Replace me with actual logic

出于可以理解的原因,如果将上面的代码更改为以下内容,则会抛出“IllegalStateException:生成器没有调用任何 SynchronousSink 方法”异常:

webClient
    .get()
    ...
    .bodyToMono(BlahServiceResponse.class)
    .subscribe(emitter::next);

所以我开始寻找异步接收器和 realized it was Flux|MonoSink 。但据我所知,Flux 中没有支持使用 Flux|MonoSink 生成有状态元素的构建器方法。

我错过了什么吗?有没有更优雅的方法?

最佳答案

静态分页

如果您事先知道页面索引并且有生成它的规则。

var elementsPerPage = 10;

Flux.generate(
        () -> 0,
        (pageIndex, emitter) -> {
            if (pageIndex < 30) {
                emitter.next(pageIndex);
            } else {
                emitter.complete();
            }
            return pageIndex + elementsPerPage;
        })
        .flatMap(pageIndex -> webClient
                .get()
                .uri("/blah?pageIndex={pageIndex}", pageIndex)
                .retrieve()
                .bodyToMono(BlahServiceResponse.class))
        .subscribe(System.out::println);

动态分页

下一页索引是否依赖于最后查询的页面。

public static void main(String[] args) {
    var elementsPerPage = 10;

    callWithPageIndex(0)
            .expand(pagedResponse -> {
                if (pagedResponse.getResponse().isEmpty()) {
                    return Mono.empty();
                } else {
                    return callWithPageIndex(pagedResponse.getPageIndex() + elementsPerPage);
                }
            })
            .subscribe(System.out::println);
}

private static Mono<PagedResponse<BlahServiceResponse>> callWithPageIndex(Integer pageIndex) {
    return webClient
            .get()
            .uri("/blah?pageIndex={pageIndex}", pageIndex)
            .retrieve()
            .bodyToMono(BlahServiceResponse.class)
            .map(response -> new PagedResponse<>(pageIndex, response));
}

@lombok.Value
static class PagedResponse<T> {
    int pageIndex;
    T response;
}

关于java - 如何在Reactor中进行分页?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67129283/

相关文章:

java - 如何配置 beanio 以忽略平面文件中的某些字符

java - 无法从 URL 位置导入 bean 定义 [classpath :applicationContext-core. xml]

javascript - 如何在 Backbone.js 中获取错误消息?

spring - 如何在 Postman 中查看 Spring 5 Reactive API 的响应?

spring-boot - Webclient 在 Docker 中泄漏内存?

json - 使用 WebFlux 从资源中读取和解析文件的 react 方式?

java - 对于 Java,是否可以接受使用无界通配符类型作为方法的参数,然后检查并将其转换为参数化类型?

java - 接口(interface)类名中的标签 <>

java - gradle:跳过任务 ':compileJava',因为它没有源文件

java - Spring 工具套件中的 maven-compiler-plugin :3. 7.0:compile (default-compile)