java - 将 Scala Future 转换为 Reactor Flux

标签 java spring project-reactor spring-reactor

我正在调用一个返回 Future of Scala 的 API,并且我想转换为 Reactor Flux ,显然不会阻塞并等待 future 的响应。我正在尝试使用 EmitterProcessor,当 future 成功时它工作得很好,但不幸的是当 future 有超时时它就不起作用。

这里是代码

private Flux<D> transformFutureToFlux(Future<T> future) {
    EmitterProcessor<D> emitterProcessor = EmitterProcessor.create();
    future.onComplete(getOnCompleteFunction(emitterProcessor), defaultExecutionContext());
    return Flux.from(emitterProcessor);
}

private <D> OnComplete getOnCompleteFunction(EmitterProcessor<D> emitterProcessor) {
    return new OnComplete() {
        @Override
        public void onComplete(Throwable t, Object result) {
            if (t != null) {
                processError(t);
            } else {
                processSucceed(result);
            }
        }

        private void processSucceed(Object result) {
            if (result instanceof ConnectorErrorVO) {
                publishConnectorErrorException((ConnectorErrorVO) result);
            } else {
                publishGenericResponse(result);
            }
        }

        private void publishGenericResponse(Object result) {
            if (result instanceof List<?>) {
                Flux.fromIterable((List<D>) result).subscribe(emitterProcessor);
            } else {
                Flux.just((D) result).subscribe(emitterProcessor);
            }
        }

        private void publishConnectorErrorException(ConnectorErrorVO result) {
            ConnectorErrorVO connectorErrorVO = result;
            Flux<D> error = Flux.error(new ConnectorErrorException( String.valueOf(connectorErrorVO.getCode()), connectorErrorVO.getDescription(), connectorErrorVO.getError()));
            error.subscribe(emitterProcessor);
        }

        private void processError(Throwable t) {
            ConnectorManagerExecutor.logger.error(null, "Error and recovery from connector manager transaction", t);
            if (t instanceof AskTimeoutException) {
                Flux.<D>error(new ConnectorErrorException("407", "connector timeout", ConnectorError.TIMEOUT)).subscribe(emitterProcessor);
            } else {
                Flux.<D>error(new ConnectorErrorException("500", t.getMessage(), ConnectorError.GENERIC)).subscribe(emitterProcessor);
            }
        }
    };

我想做的是正确的吗?有更好的方法吗?

问候

最佳答案

如果你返回一个Future,则意味着只有1个返回,而不是一系列/流的返回值。 如果需要,您可以转换为 Flux,但为什么不使用 Mono 呢?

另外,如果您使用 scala,请尝试使用 reactor-scala-extensions SMano.fromFuture

检查SMonoTest举个例子。

    import scala.concurrent.ExecutionContext.Implicits.global
    StepVerifier.create(SMono.fromFuture(Future[Long] {
      randomValue
    }))
      .expectNext(randomValue)
      .verifyComplete()

关于java - 将 Scala Future 转换为 Reactor Flux,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58190038/

相关文章:

java - 合并 CSV 文件并写入 BigQuery 的 Google Cloud Dataflow 示例

java - 从资源中解析 XML 文件

java - 如何使用 Spring Data JPA 获取不同的实体字段?

java - SubscribeOn 仅使用池中的 1 个线程

java - 线程安全响应式(Reactive)缓存

java - onTouchEvent() 没有按我的预期工作

java - Android,播放音频剪辑的多个 ImageButtons

java - 如何在单元测试中断言 Spring Webflow 通过特定的结束状态退出?

java - Spring Boot 中的策略

multithreading - 如何在 Reactor 中使用多个线程执行 flatMap?