java - Observable.map 抛出错误未在订阅中捕获

标签 java rx-java project-reactor

代码如下:

public void initiateProcess() {

findAbandonedOpenOrders() //returns Observable<AsyncN1QueryResult>
    .flatMap(results -> results.rows())
    .map(row -> row.value())
    .map(s -> processStringToGetOrderId(s.toString()))
    .map(
        o -> {
          log.info("Generating access token for orderId: {}", o);
          return identityConnector
              .getServiceTokenFromIdentity()
              .map(
                  issueToken ->
                      RequestInputModel.builder()
                          .authorisationToken(issueToken.getAccessToken())
                          .orderId(o)
                          .build())
              .map(
                  requestInputModel -> {
                    log.info(
                        "Invoking cancel order for orderId: {}",
                        requestInputModel.getOrderId());
                    return cancelOrderApiConnector
                        .invokeAPI(
                            requestInputModel,
                            RequestInputModel.RequestBodyModel.builder().build())  //throw RuntimeException as soon as the flow enters this method
                        .subscribe();
                  })
              .subscribe();
        })
    .subscribe(
        s -> {},
        e -> {
          log.error(ExceptionUtils.getStackTrace(e));
        });
  }

一旦我调用cancelOrderApiConnector.invokeAPI,它就会抛出运行时异常。因此,直接的 map 应该抛出它,而最外面的 map 又应该抛出它。但这并没有发生。堆栈跟踪不会被打印(逻辑在订阅中实现)。

有人可以告诉我我可能做错了什么吗?

最佳答案

不要破坏 react 链,将这些 map(v -> codeThatProduceAFlux.subscribe()) 替换为 flatMap(v -> codeThatProduceAFlux) 并且错误将正确在子步骤之间传播

关于java - Observable.map 抛出错误未在订阅中捕获,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52976525/

相关文章:

java - IntelliJ 无法识别 JDK 版本

java - 在jsp中传递变量时出错?

java - 如何从 Observable.fromIterable() 获得最终结果?

java - 退出循环而不执行map或flatMap

project-reactor - 由合并构建的 Flux 是如何完成的

java - 2个节点之间的最长路径

java - 在使用 BoxLayout 面板构建 Java GUI 时,如何将两个 swing 组件添加到同一行?

android - RxJava - 订阅者只运行一次吗

java - RxJava 的 zip 操作符问题

java - Flux 中 Mono 的 doOnSuccess 方法相当于什么?