java - 我可以对 subscribeOn 方法和异步任务使用相同的执行器吗

标签 java multithreading java.util.concurrent spring-webflux project-reactor

嗨,我有一个简单的问题,假设我有一个如下所示的类(class):

import lombok.Value;

import java.nio.file.Path;

@Value
class ImageResizeRequest {

    private DownloadedImage downloadedImage;

    private ImageSize imageSize;

    private Path destinationLocation;
}

上面的类代表负责将图像大小调整为给定大小的单个任务。我有很多请求将此图像调整为多种不同的尺寸。

@RequiredArgsConstructor
class ImageResizeService {

    private final Executor executor;

    Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {

        return Flux.fromIterable(requests)
                .flatMap(this::resize)
                .collectList()
                .subscribeOn(Schedulers.fromExecutor(executor));
    }

    private Mono<ImageResizeResult> resize(ImageResizeRequest request) {

        return Mono.fromFuture(CompletableFuture.supplyAsync(resizeTask(request), executor));

    }

    private Supplier<ImageResizeResult> resizeTask(ImageResizeRequest request) {
        return () -> {
            //TODO add image resize logic for example ImageMagick by Im4Java...
            /** code below call ImageMagick library
             ConvertCmd cmd = new ConvertCmd();
             IMOperation op = new IMOperation();
             op.quality(100d);
             op.addImage(request.getDestinationLocation().toString());
             cmd.run(op);

             */
            //TODO add logic!!!
            return new ImageResizeResult(null, null, null, null);
        };
    }
}

我的问题是: 如何在 Project Reactor 中实现负责调整图像大小的并行独立任务?如果没有 Project Reactor,我将使用 CompletableFuture 列表:

private static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
    CompletableFuture<Void> allDoneFuture =
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
    return allDoneFuture.thenApply(v ->
            futures.stream().
                    map(future -> future.join()).
                    collect(Collectors.<T>toList())
    );
}

具有指定的执行器服务。此外,在我的示例中,我在 subscribeOn 方法和 SupplyAsync 中使用相同的执行器 - 是个好主意吗?

最佳答案

不要不断地从 ExecutorService 重新创建 Scheduler,而是努力将其直接包装在构造函数中。

您根本不需要 CompletableFuture,并且 subscribeOn 应该应用于 flatMap 的内部,以可能选择单独的线程每个调整大小任务(它从适用于每个 Flux 的池中选择一个线程):

class ImageResizeService {

  private final Executor executor; //TODO prefer an ExecutorService if possible
  private final Scheduler scheduler; //FIXME Schedulers.fromExecutor(executor)

  Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {
    //we get the requests on IO thread
    return Flux.fromIterable(requests)
            //for each request, perform asynchronous resize...
            .flatMap(r -> Mono
                //... by converting the resizeTask Callable to a Mono
                .fromCallable(r -> resizeTask(r).get())
                //... and making sure it executes on the executor
                .subscribeOn(scheduler)
            )
            .collectList();
  }
}

为了实现真正的并行化,您还有另一个选择:parallel().runOn():

Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {
    //we get the requests on IO thread
    return Flux.fromIterable(requests)
            //divide into N workloads
            //the executor _should_ be capable of this degree of parallelisation:
            .parallel(NUMBER_OF_DESIRED_THREADS)
            //actually tell to run each workload on a thread picked from executor
            .runOn(scheduler) 
            //here the workload are already running on their dedicated thread,
            //we can afford to block it and thus apply resize in a simpler `map`
            .map(r -> resizeTask(r).get()) //NB: the Supplier aspect can probably be removed
            //go back to a `Flux` sequence for collection into list
            .sequential()
            .collectList();
}

关于java - 我可以对 subscribeOn 方法和异步任务使用相同的执行器吗,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52028743/

相关文章:

java - C 结构到 Java JNA 结构(指向结构的指针)

c++ - 尝试加入时线程崩溃

Java多线程,Thread.sleep暂停当前线程但不会继续其他线程

java - 对于阻塞集合中的每个

java - 彩信查询仅返回已发送附件的消息大小

java - Java 中删除的文件会发生什么?

java重用一个执行器

java - 为什么 count less 还要用 synchronized 关键字呢?

java - 'java.lang.NullPointerException' 错误的未知来源

c++ - 如果我只需要它在其他线程中的值,我是否应该在一个线程中锁定一个变量,如果我不这样做,为什么它会起作用?