我在玩 Flow API,到目前为止,我知道 request()
方法用于背压。大多数文章指出,这类似于控制消费速度。
但是,我看到的几乎每个示例代码都传递值 1
进request()
方法,例如,subscription.request(1)
.但我不太明白 request()
是怎么做的方法是控制消耗速度。
我试图通过向发布者发送一堆项目并打印线程名称来运行测试,似乎每个 onNext()
运行在同一个工作线程上,因为我正在使用 request(1)
或 request(50)
:
@Override
public void onNext(T item) {
System.out.println(Thread.getCurrent().getName());
Thread.sleep(5000);
subscription.request(50);
}
如果
onNext()
在不同的线程中运行,我可以理解 n
传入 request(n)
的值将影响并行处理项目的速率(在 n
个线程中运行)。但在我的测试中似乎并非如此,因为它们都在相同的线程名称下运行。在这种情况下,
request(1)
和有什么区别?和 request(50)
当它们仍然要在同一个线程上一个接一个地依次运行时?那消费率不是还是一样吗?
最佳答案
n
在 request
指示订阅者可以接受多少个元素并限制上游 Publisher
的项目数量可以发出。因此,这个生成器的减慢不是每个单独的项目,而是由消费者的处理时间交错生成的每个批次的平均时间。onNext
以序列化方式执行,并且取决于上游,也在同一线程上执行。因此,调用 request
在那里通常表示它可以调用相同的上游 onNext
, 当前通话结束后 , 下一个值(如果可用)。即,调用 Thread.sleep
将推迟下一次调用 onNext
.
一般来说,没有理由调用request
在 onNext
终端订阅者,因为它与其直接上游同步运行 Publisher
和单个 request(Long.MAX_VALUE)
之间没有实际区别并重复 request(1)
.
调用 request
的少数理由之一如果onNext
fork 异步工作本身,只有在该工作结束时才应该请求更多项目:
Executor executor = ...
Subscription upstream;
@Override public void onSubscribe(Subscription s) {
this.upstream = s;
executor.execute(() -> {
Thread.sleep(5000);
s.request(1);
return null; // Callable
});
}
@Override public void onNext(T item) {
System.out.println("Start onNext");
executor.execute(() -> {
System.out.println("Run work");
Thread.sleep(5000);
System.out.println("Request more work");
upstream.request(1);
return null; // Callable
});
System.out.println("End onNext");
}
使用此设置,上游将调用
onNext
一次,只有当执行者执行的任务发出下一个请求时才会调用它。请注意,除非 Publisher
从专用线程发出,上面的例子最终会拖拽 onNext
调用到 executor
的线程。
关于java - Flow API 中的 subscription.request(n) 如何在任意 n 值执行背压?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59555464/