java - 使用 reactor 的 Flux.buffer 进行批处理仅适用于单个项目

标签 java kotlin project-reactor reactive-streams

我正在尝试使用 Flux.buffer() 从数据库批量加载。

用例是从数据库加载记录可能会“突发”,我想引入一个小缓冲区来尽可能将加载分组。

我的概念方法是使用某种形式的处理器,发布到它的接收器,让那个缓冲区,然后订阅和过滤我想要的结果。

我尝试了多种不同的方法(不同类型的处理器,以不同的方式创建过滤后的 Mono)。

以下是我到目前为止所取得的成果 - 主要是绊脚石。

目前,这会返回一个结果,但后续调用会被丢弃(尽管我不确定在哪里)。

class BatchLoadingRepository {
    // I've tried all manner of different processors here.  I'm unsure if
    // TopicProcessor is the correct one to use.
    private val bufferPublisher = TopicProcessor.create<String>()
    private val resultsStream = bufferPublisher
            .bufferTimeout(50, Duration.ofMillis(50))
            // I'm unsure if concatMapIterable is the correct operator here, 
            // but it seems to work.
            // I'm really trying to turn the List<MyEntity> 
            // into a stream of MyEntity, published on the Flux<>
            .concatMapIterable { requestedIds ->
                // this is a Spring Data repository.  It returns List<MyEntity>
                repository.findAllById(requestedIds)
            }

    // Multiple callers will invoke this method, and then subscribe to receive
    // their entity back.
    fun findByIdAsync(id: String): Mono<MyEntity> {

        // Is there a potential race condition here, caused by a result
        // on the resultsStream, before I've subscribed?
        return Mono.create<MyEntity> { sink ->
            bufferPublisher.sink().next(id)
            resultsStream.filter { it.id == id }
                    .subscribe { next ->
                        sink.success(next)
                    }
        }
    }
}

最佳答案

您好,我正在测试您的代码,我认为最好的方法是使用共享的 EmitterProcessor。我用emitterProcessor做了一个测试,它似乎工作。

Flux<String> fluxi;
EmitterProcessor emitterProcessor;

@Override
public void run(String... args) throws Exception {
    emitterProcessor = EmitterProcessor.create();

    fluxi = emitterProcessor.share().bufferTimeout(500, Duration.ofMillis(500))
            .concatMapIterable(o -> o);

    Flux.range(0,1000)
            .flatMap(integer -> findByIdAsync(integer.toString()))
            .map(s -> {
                System.out.println(s);
                return s;
            }).subscribe();

}

private Mono<String> findByIdAsync(String id) {
    return Mono.create(monoSink -> {
        fluxi.filter(s -> s == id).subscribe(value -> monoSink.success(value));
        emitterProcessor.onNext(id);
    });
}

关于java - 使用 reactor 的 Flux.buffer 进行批处理仅适用于单个项目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55188127/

相关文章:

java - 我什么时候需要使用@WebServiceRef?

java - Java响应式框架的比较

spring - 为什么默认配置的spring webflux中没有异常堆栈跟踪?

java - 创建 bean 时是否可以让 ApplicationContext 使用 validator ?

java - MyContentProvider.onCreate() 没有被调用?

java - 如何优雅地支持firebase数据库上的数据类型更改?

具有可为空变量的 Kotlin 智能转换

kotlin - Kotlin将FileTime转换为日,月,年

spring-webflux - Spring 2.0 WebFlux : Merge multiple Mono<String> , 其中 string 是转换为 string 的 json,转换为单个 Flux<String>

java - 从文本文件中读取数据,将每个单词转换为 PigLatin