java - Reactor Mono - 执行并行任务

标签 java reactive-programming spring-webflux project-reactor reactor

我是 Reactor 框架的新手,并尝试在我们现有的实现之一中使用它。 LocationProfileService 和 InventoryService 都返回一个 Mono,并且并行执行并且彼此不依赖(来自 MainService)。在 LocationProfileService 中 - 发出了 4 个查询,最后 2 个查询依赖于第一个查询。

有什么更好的写法吗?我看到调用是按顺序执行的,而其中一些调用应该并行执行。正确的做法是什么?

public class LocationProfileService {
        static final Cache<String, String> customerIdCache //define Cache

        @Override
        public Mono<LocationProfileInfo> getProfileInfoByLocationAndCustomer(String customerId, String location) {
            //These 2 are not interdependent and can be executed immediately
            Mono<String> customerAccountMono = getCustomerArNumber(customerId,location) LocationNumber).subscribeOn(Schedulers.parallel()).switchIfEmpty(Mono.error(new CustomerNotFoundException(location, customerId))).log();
            Mono<LocationProfile> locationProfileMono = Mono.fromFuture(//location query).subscribeOn(Schedulers.parallel()).log();

    //Should block be called, or is there a better way to do ?
            String custAccount = customerAccountMono.block(); // This is needed to execute and the value from this is needed for the next 2 calls

            Mono<Customer> customerMono = Mono.fromFuture(//query uses custAccount from earlier step).subscribeOn(Schedulers.parallel()).log();
            Mono<Result<LocationPricing>> locationPricingMono = Mono.fromFuture(//query uses custAccount from earlier step).subscribeOn(Schedulers.parallel()).log();

            return Mono.zip(locationProfileMono,customerMono,locationPricingMono).flatMap(tuple -> {
                LocationProfileInfo locationProfileInfo = new LocationProfileInfo();
                //populate values from tuple
                return Mono.just(locationProfileInfo);
            });


        }

        private Mono<String> getCustomerAccount(String conversationId, String customerId, String location) {
            return CacheMono.lookup((Map)customerIdCache.asMap(),customerId).onCacheMissResume(Mono.fromFuture(//query).subscribeOn(Schedulers.parallel()).map(x -> x.getAccountNumber()));
        }

}


public class InventoryService {

    @Override
    public Mono<InventoryInfo> getInventoryInfo(String inventoryId) {
        Mono<Inventory> inventoryMono = Mono.fromFuture(//inventory query).subscribeOn(Schedulers.parallel()).log();
        Mono<List<InventorySale>> isMono = Mono.fromFuture(//inventory sale query).subscribeOn(Schedulers.parallel()).log();

        return Mono.zip(inventoryMono,isMono).flatMap(tuple -> {
            InventoryInfo inventoryInfo = new InventoryInfo();
            //populate value from tuple

            return Mono.just(inventoryInfo);

        });
    }

}

public class MainService {

        @Autowired
        LocationProfileService locationProfileService;
        @Autowired
        InventoryService inventoryService

        public void mainService(String customerId, String location, String inventoryId) {
            Mono<LocationProfileInfo> locationProfileMono = locationProfileService.getProfileInfoByLocationAndCustomer(....);
            Mono<InventoryInfo> inventoryMono = inventoryService.getInventoryInfo(....);

            //is using block fine or is there a better way to do?
            Mono.zip(locationProfileMono,inventoryMono).subscribeOn(Schedulers.parallel()).block();
        }

}

最佳答案

您不需要阻止即可获得该参数的传递,您的代码非常接近解决方案。我使用您提供的类名编写了代码。只需将所有 Mono.just(....) 替换为对正确服务的调用即可。

    public Mono<LocationProfileInfo> getProfileInfoByLocationAndCustomer(String customerId, String location) {
    Mono<String> customerAccountMono = Mono.just("customerAccount");
    Mono<LocationProfile> locationProfileMono = Mono.just(new LocationProfile());

    return Mono.zip(customerAccountMono, locationProfileMono)
            .flatMap(tuple -> {
                Mono<Customer> customerMono = Mono.just(new Customer(tuple.getT1()));
                Mono<Result<LocationPricing>> result = Mono.just(new Result<LocationPricing>());
                Mono<LocationProfile> locationProfile = Mono.just(tuple.getT2());
                return Mono.zip(customerMono, result, locationProfile);
            })
            .map(LocationProfileInfo::new)
    ;
}

public static class LocationProfileInfo {
    public LocationProfileInfo(Tuple3<Customer, Result<LocationPricing>, LocationProfile> tuple){
        //do wathever
    }
}


public static class LocationProfile {}

private static class Customer {
    public Customer(String cutomerAccount) {
    }
}

private static class Result<T> {}

private static class LocationPricing {}

请记住,第一个 zip 不是必需的。我重新编写它来匹配您的解决方案。但我会用稍微不同的方式解决这个问题。这样就更清楚了。

public Mono<LocationProfileInfo> getProfileInfoByLocationAndCustomer(String customerId, String location) {
return Mono.just("customerAccount") //call the service                                                
        .flatMap(customerAccount -> {                                                                 
            //declare the call to get the customer                                                    
            Mono<Customer> customerMono = Mono.just(new Customer(customerAccount));                   

            //declare the call to get the location pricing                                            
            Mono<Result<LocationPricing>> result = Mono.just(new Result<LocationPricing>());          

            //declare the call to get the location profile                                            
            Mono<LocationProfile> locationProfileMono = Mono.just(new LocationProfile());             

            //in the zip call all the services actually are executed                                  
            return Mono.zip(customerMono, result, locationProfileMono);                               
        })                                                                                            
        .map(LocationProfileInfo::new)                                                                
;                                                                                                     

}

关于java - Reactor Mono - 执行并行任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54281833/

相关文章:

java - 为什么我的 Grpc ClientInterceptor 没有正确的 react 器上下文?

javascript - 如何使用 rxjs 和 redux-observable 等待或监听 Url 已更改?

javascript - RxJS - 发生错误时可观察对象未完成

java - 如何在 Spring WebFlux 中配置背压?

java - 选择适用于 Java 的响应式 Web 框架

rx-java - 我应该使用响应式(Reactive)编程(RxJava)来解决复杂问题吗?

java - 如何暂停一个 Runnable 直到另一个相同类型的任务完成?

java - 将数据库中的数据显示到文本框中

java - 将 XML 的正则表达式部分与 java 匹配时出现问题

java.lang.UnsatisfiedLinkError : org. apache.hadoop.io.nativeio.NativeIO$Windows.createDirectoryWithMode0 问题