java - 如何并行执行 Hystrix 命令?

标签 java hystrix

我的应用程序中 Spring 服务的一个方法向其他两个微服务发出请求。我想使用 Hystrix 发出这些请求以使其具有容错能力,并且我想并行运行它们。

到目前为止,我为每次调用都实现了 HystrixObservableCommand,并使用 CountDownLatch 等待两个命令完成(或失败)。

当前的解决方案看起来非常冗长。是否可以使用 Observable 特性并行执行 Hystrix 命令?

所需的解决方案在伪代码中看起来像:

new LoadCustomerObservableCommand(customerClient, customerId).toObservable()
    .doOnError(throwable -> log.error("Failed to retrieve customer {} information for the reservation {}", customerId, reservationId, throwable))
    .doOnNext(customer -> myResponse.setCustomer(customer));

new GetTicketsObservableCommand(ticketsClient, reservationId).toObservable()
    .doOnError(throwable -> log.error("Failed to retrieve tickets for the reservation {}", reservationId, throwable))
    .doOnNext(tickets -> myResponse.setTickets(tickets));

final AtomicBoolean subRequestsFailed = new AtomicBoolean(false);

Observable.zip(customerObservable, ticketsObservable, (customer, tickets) -> null)
    .doOnError(throwable -> subRequestsFailed.set(true))
    .toBlocking()
    .first();

if (subRequestsFailed.get()) {
     throw new HystrixBadRequestException("One or more requests to submodules have been failed");
}

return dto;

不幸的是,这个理想的解决方案不起作用,因为 Hystrix 命令永远不会执行。

我目前的解决方案是:

    // execute requests to sub modules in parallel
    final CountDownLatch cdl = new CountDownLatch(2);
    final List<Throwable> failures = new ArrayList<>();

    // load customer information
    final Observable<CustomerDTO> customerObservable = customerRxClient.loadCustomer(customerId);

    customerObservable
            .doOnError(throwable -> {
                log.error("Failed to retrieve customer {} information for the reservation {}", customerId, reservationId, throwable);

                cdl.countDown();

                failures.add(throwable);
            })
            .doOnCompleted(cdl::countDown)
            .subscribe(customer -> {
                dto.getReservationOwner().setBirthday(customer.getBirthday());
                dto.getReservationOwner().setCustomerId(customer.getCustomerId());
                dto.getReservationOwner().setCitizenship(customer.getCitizenship());
                dto.getReservationOwner().setEmail(customer.getEmail());
                dto.getReservationOwner().setFirstName(customer.getFirstName());
                dto.getReservationOwner().setGender(customer.getGender());
                dto.getReservationOwner().setLastName(customer.getLastName());
                dto.getReservationOwner().setPhone(ofNullable(customer.getPhone()).map(v -> mappingService.map(v, PhoneDTO.class)).orElse(null));
            });

    // load tickets
    final Observable<List<TicketDTO>> ticketsObservable = ticketsClient.getTickets(reservationId);

    ticketsObservable
            .doOnError(throwable -> {
                log.error("Failed to retrieve tickets for the reservation {}", reservationId, throwable);

                cdl.countDown();

                failures.add(throwable);
            })
            .doOnCompleted(cdl::countDown)
            .subscribe(tickets -> dto.setTickets(tickets.stream()
                    .map(ticket -> ReservationDTO.TicketDTO.builder()
                            .guestSeqN(ticket.getGuestSeqN())
                            .qr(ticket.getQr())
                            .qrText(ticket.getQrText())
                            .usedAt(ticket.getUsedAt())
                            .build())
                    .collect(toList())));

    try {
        cdl.await();
    } catch (InterruptedException _ignore) {
        log.debug("Count down latch has been interrupted!", _ignore);
    }

    if (!failures.isEmpty()) {
        throw new HystrixBadRequestException("Request to submodule has been failed");
    }

 return dto;

最佳答案

您对所需解决方案的想法是正确的,因为它使用了 zip 组合器。 Hystrix 命令未在该解决方案中执行的原因是生成的 Observable 没有订阅者。来自documentation :

toObservable() — returns a "cold" Observable that won’t subscribe to the underlying Observable until you subscribe to the resulting Observable

只需调用 subscribe()组合 Observable 的方法:

Observable.zip(customerObservable, ticketsObservable, (customer, tickets) -> null)
          .take(1)
          .doOnError(throwable -> subRequestsFailed.set(true))
          .subscribe();

关于java - 如何并行执行 Hystrix 命令?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48169274/

相关文章:

java - 如何访问接口(interface)内的抽象类

java - HTTP 状态 503 - 此应用程序当前在部署时不可用

java - For 循环被跳过

java - 检查文件是否存在,如果存在则创建一个具有更高编号的新文件

spring-boot - Hystrix Command注解在应用中是如何工作的

eclipse - 如何确定我的 Java 应用程序的最低 JRE 版本和系统要求

java - HystrixCommand 和 HystrixObservableCommand 的区别

java - 当没有定义缓存时,HystrixCommand肯定执行 `run`还是 `fallback`

Spring-Cloud、Hystrix 和 JPA - LazyInitializationException

java - 为单个 Feign 客户端禁用 Hystrix