java - 使用 react 堆即发即忘

标签 java spring spring-boot spring-webflux project-reactor

我的 Spring boot 应用程序中有如下方法。

public Flux<Data> search(SearchRequest request) {
  Flux<Data> result = searchService.search(request);//this returns Flux<Data>
  Mono<List<Data>> listOfData = result.collectList();
//  doThisAsync() // here I want to pass this list and run some processing on it
// the processing should happen async and the search method should return immediately.
  return result;
}

//this method uses the complete List<Data> returned by above method
public void doThisAsync(List<Data> data) {
  //do some processing here
}

目前,我正在使用@Async带注释的服务类 doThisAsync ,但不知道如何传递List<Data> ,因为我不想打电话block 。 我只有Mono<List<Data>> .

我的主要问题是如何单独处理这个 Mono 和 search方法应返回 Flux<Data> .

最佳答案

1,如果您的即发即忘已经是异步返回Mono/Flux

public Flux<Data> search(SearchRequest request)
{
    return searchService.search(request)
                        .collectList()
                        .doOnNext(data -> doThisAsync(data).subscribe())  // add error logging here or inside doThisAsync
                        .flatMapMany(Flux::fromIterable);
}

public Mono<Void> doThisAsync(List<Data> data) {
    //do some async/non-blocking processing here like calling WebClient
}

2,如果您的即发即弃确实阻塞了 I/O

public Flux<Data> search(SearchRequest request)
{
    return searchService.search(request)
                        .collectList()
                        .doOnNext(data -> Mono.fromRunnable(() -> doThisAsync(data))
                                              .subscribeOn(Schedulers.elastic())  // delegate to proper thread to not block main flow
                                              .subscribe())  // add error logging here or inside doThisAsync
                        .flatMapMany(Flux::fromIterable);
}

public void doThisAsync(List<Data> data) {
    //do some blocking I/O on calling thread
}

请注意,在上述两种情况下,您都会失去背压支持。如果 doAsyncThis 由于某种原因变慢,那么数据生产者不会关心并继续生产项目。这是即发即弃机制的自然结果。

关于java - 使用 react 堆即发即忘,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57566465/

相关文章:

java - 每次调用其方法时都会重新创建 CGlib-wired bean

java - Spring JPA 一个实体中的多个多对一关系

@Preauthorize 中的 Spring Boot 属性

java - 为什么 jsp 的静态部分在 java 部分之后进行流式传输?

java - 在工厂模式中使用反射

java - spring 3、hibernate、dbcp 和 derby 连接问题

java - 一对多 : java. sql.SQLSyntaxErrorException:表 'table_name' 不存在

java - 使用 MS SQL hibernate

java - 从字符串数组创建 Android 位置

java - Logback 创建日志文件但不写入任何内容