java - 从昂贵的方法传递结果,因为它们来自多个层

标签 java

我有一个与此类似的代码:

List<String> ids = expensiveMethod();
List<String> filteredIds = cheapFilterMethod(ids);

if (!filteredIds.isEmpty()) {
    List<SomeEntity> fullEntities = expensiveDatabaseCall(filteredIds);
    List<SomeEntity> filteredFullEntities = anotherCheapFilterFunction(fullEntities);
    if (!filteredFullEntities.isEmpty()) {
        List<AnotherEntity> finalResults = stupidlyExpensiveDatabaseCall(filteredFullEntities);
        relativelyCheapMethod(finalResults);
    }
}

它基本上是几个昂贵方法的瀑布,这些方法本身要么从数据库中获取某些内容,要么过滤以前的数据库结果。这是由于 stupidlyExpentialDatabaseCall 造成的,它需要尽可能少的剩余实体,因此需要进行详尽的过滤。

我的问题是,其他函数也不是都很便宜,因此它们会阻塞线程几秒钟,而 stupidlyExpectiveDatabaseCall 正在等待并且什么也不做,直到它一次获得整个批处理.

我想处理每个方法的结果当它们进来时。我知道我可以为每个单独的方法编写一个线程,并在它们之间有一些并发队列,但这是我想避免的大量样板文件。有更优雅的解决方案吗?

最佳答案

a post关于并行化的不同方法,不仅包括parallelStream()方法,还包括连续步骤按照您描述的方式并行运行,通过队列链接。 RxJava可能适合您在这方面的需求。它是 java9 中相当零散的响应式(Reactive)流 API 的更完整的变种。但我认为,只有使用reactive db api,您才真正在那里。随之而来。

这就是 RxJava 方式:

public class FlowStream {

@Test
public void flowStream() {
    int items = 10;

    print("\nflow");
    Flowable.range(0, items)
            .map(this::expensiveCall)
            .map(this::expensiveCall)
            .forEach(i -> print("flowed %d", i));

    print("\nparallel flow");
    Flowable.range(0, items)
            .flatMap(v ->
                    Flowable.just(v)
                            .subscribeOn(Schedulers.computation())
                            .map(this::expensiveCall)
            )
            .flatMap(v ->
                    Flowable.just(v)
                            .subscribeOn(Schedulers.computation())
                            .map(this::expensiveCall)
            ).forEach(i -> print("flowed parallel %d", i));

    await(5000);

}

private Integer expensiveCall(Integer i) {
    print("making %d more expensive", i);
    await(Math.round(10f / (Math.abs(i) + 1)) * 50);
    return i;
}

private void await(int i) {
    try {
        Thread.sleep(i);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

private void print(String pattern, Object... values) {
    System.out.println(String.format(pattern, values));
}

}

Maven 仓库:

   <!-- https://mvnrepository.com/artifact/io.reactivex.rxjava2/rxjava -->
    <dependency>
        <groupId>io.reactivex.rxjava2</groupId>
        <artifactId>rxjava</artifactId>
        <version>2.2.13</version>
    </dependency>

关于java - 从昂贵的方法传递结果,因为它们来自多个层,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58583102/

相关文章:

java - 由 : org. codehaus.jackson.map.JsonMappingException : Can not deserialize instance of com. model.user 超出 START_ARRAY token 引起

java - 完全支持 POJO 中的 JavaFX 属性

java - 向 JPanel 添加图形时遇到问题

java - 通过传递引用与返回引用来反序列化对象

java - java.util.List 中的第一个元素被覆盖

java - 如何将数据从servlet发送到rest api

java - 输出是什么,为什么?

java - 在JavaFX应用程序的Main方法中获取.fxml文件中定义的TextArea

使用 XPath 进行 Java XML 解析?

java - JUnit 和集成测试 - 是否可以在运行的任何测试之前运行一个