java - 通过 Spring Reactor 从 Future 创建 HotStream

标签 java spring reactive-programming reactor

我想制作一个示例原型(prototype),其中使用 Spring Reactor Stream API 由 ServiceA 返回的值来调用 ServiceC。所以我写了这样的代码

final ExecutorService executor = new ThreadPoolExecutor(4, 4, 10, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
        Streams.defer(executor.submit(new CallToRemoteServiceA()))
                .flatMap(s -> Streams.defer(executor.submit(new CallToRemoteServiceC(s))))
                    .consume(s -> System.out.println("End Result : " + s));

为了模拟 ServiceA 和 ServiceC 中涉及的延迟,CallToRemoteServiceA 和 CallToRemoteServiceC 的 call() 方法具有 Thread.sleep() 方法。问题是,当我注释掉 Thread.sleep() 方法时,即服务方法调用没有延迟,这在现实世界中是不正确的,消耗方法被调用。如果 Thread.sleep() 方法保留在适当的位置,则不会调用 Consumer 方法。我知道 Streams.defer() 返回一个冷流,因此它可能只对注册后接受的项目执行消费方法,但后来我想知道如何从 ExecutorService 返回的 Future 创建 HotStream?

最佳答案

我相信这是由于reactor.rx.stream.FutureStream.subscribe()方法中的错误造成的。在这一行中:

try {
        // Bug in the line below since unit is never null
        T result = unit == null ? future.get() : future.get(time, unit); 

        buffer.complete();

        onNext(result);
        onComplete();

} catch (Throwable e) {
        onError(e); <-- With default constructor this gets called if time == 0 and
                        future has as yet not returned
}

在这种情况下,当调用默认的 FutureStream(Future) 构造函数时,单位永远不会为 null,因此上面的代码总是调用 future.get(0, TimeUnit.SECONDS) ,导致 catch(Throwable) 中立即出现超时异常堵塞。如果你们同意这是一个错误,我可以提出拉取请求并修复此问题?

关于java - 通过 Spring Reactor 从 Future 创建 HotStream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27513037/

相关文章:

rx-java - 将 bool 可观察值列表映射到输出,只要所有输入均为 true,该输出就会发出 true

javascript - RxJS - 无限滚动 - 在上一个未到达时发送请求

java - 在 mono.compose() 内部添加 doOnSuccess() 与简单的 mono.doOnSuccess() 有什么好处

java - Java中的系统日期格式

java - Spring boot - @ControllerAdvice 不起作用

spring - 使用@ManyToMany JPA 注释时不需要的唯一约束

java - Java Spring如何查询MongoDB?

java - Spring AspectJ 切点

java - 尝试使用 java 中的变量将对象设置为 true 或 false

java - 如何知道 Spring/CXF 何时准备好接受请求?