spring-webflux - 如何在 Flux 中迭代一个对象并对其进行操作?

标签 spring-webflux project-reactor reactor reactive-streams

我正在使用项目 react 堆,我想执行以下操作:

    @Override
    public void run(ApplicationArguments args) {
        Flux.from(KafkaReceiver.create(receiverOptions)
                        .receive()
                        .map(this::getObject)
                        .flatMap(this::iterateElasticWrites)
                        .flatMap(this::writeTheWholeObjectToS3)
        ).subscribe();
    }

    // What I'd like to do - but a non reactive code
    private Publisher<MyObj> iterateElasticWrites(MyObj message) {
        for (MyDoc file: message.getDocs()) {
            writeElasticDoc(file.getText());
        }
        return Mono.just(message);
    }

我正在努力寻找与 Project Reactor 中的 iterateElasticWrites 等效的方法。我想对我的对象 (MyObj) 执行一次迭代,并将其每个文档列表的元素 react 性地写入 elasticsearch。

最佳答案

在 Reactor 中,您始终需要使用不同的运算符构建 react 流,所有反应/异步代码都应返回 MonoFlux

看看你的例子,它看起来像

private Mono<MyObj> iterateElasticWrites(MyObj message) {
    return Flux.fromIterable(message.getDocs())
            .flatMap(doc -> writeElasticDoc(doc.getText()))
            .then(Mono.just(message));
}

其中 writeElasticDoc 可以定义为

private Mono<Void> writeElasticDoc(String text) {
    ...
}

关于spring-webflux - 如何在 Flux 中迭代一个对象并对其进行操作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71905954/

相关文章:

spring-webflow - 支持通过 Spring Cloud Gateway 代理 SOAP/XML 端点

twisted - 在某个条件下停止扭曲 react 器

java - react 堆模式如何与线程一起工作

spring - 无法在 react 测试中初始化 'javax.el.ExpressionFactory'

java - Spring 5 webflux 如何为现有 Webclient 设置超时

java - 在 Spring Boot 中使用 @EnableWebFluxSecurity 时出错

java - 方法引用 : cannot convert reactor. core.publisher.Mono<S> 到reactor.core.publisher.Mono<? 中的返回类型错误延伸R>

spring-webflux - react 堆句柄运算符返回对象?

java - 在 java flux 中按对象属性分组