java - Flow API 中的 subscription.request(n) 如何在任意 n 值执行背压?

标签 java concurrency rx-java reactive-programming backpressure

我在玩 Flow API,到目前为止,我知道 request()方法用于背压。大多数文章指出,这类似于控制消费速度。

但是,我看到的几乎每个示例代码都传递值 1request()方法,例如,subscription.request(1) .但我不太明白 request() 是怎么做的方法是控制消耗速度。

我试图通过向发布者发送一堆项目并打印线程名称来运行测试,似乎每个 onNext()运行在同一个工作线程上,因为我正在使用 request(1)request(50) :

@Override
public void onNext(T item) {
   System.out.println(Thread.getCurrent().getName());
   Thread.sleep(5000);
   subscription.request(50);
}

如果onNext()在不同的线程中运行,我可以理解 n传入 request(n) 的值将影响并行处理项目的速率(在 n 个线程中运行)。但在我的测试中似乎并非如此,因为它们都在相同的线程名称下运行。

在这种情况下,request(1) 和有什么区别?和 request(50)当它们仍然要在同一个线程上一个接一个地依次运行时?那消费率不是还是一样吗?

最佳答案

nrequest指示订阅者可以接受多少个元素并限制上游 Publisher 的项目数量可以发出。因此,这个生成器的减慢不是每个单独的项目,而是由消费者的处理时间交错生成的每个批次的平均时间。
onNext以序列化方式执行,并且取决于上游,也在同一线程上执行。因此,调用 request在那里通常表示它可以调用相同的上游 onNext , 当前通话结束后 , 下一个值(如果可用)。即,调用 Thread.sleep将推迟下一次调用 onNext .

一般来说,没有理由调用requestonNext终端订阅者,因为它与其直接上游同步运行 Publisher和单个 request(Long.MAX_VALUE) 之间没有实际区别并重复 request(1) .

调用 request 的少数理由之一如果onNext fork 异步工作本身,只有在该工作结束时才应该请求更多项目:

Executor executor = ...

Subscription upstream;

@Override public void onSubscribe(Subscription s) {
    this.upstream = s;
    executor.execute(() -> {
       Thread.sleep(5000);
       s.request(1);
       return null; // Callable
    });
}

@Override public void onNext(T item) {
    System.out.println("Start onNext");
    executor.execute(() -> {
       System.out.println("Run work");
       Thread.sleep(5000);
       System.out.println("Request more work");
       upstream.request(1);
       return null; // Callable
    });
    System.out.println("End onNext");
}

使用此设置,上游将调用 onNext一次,只有当执行者执行的任务发出下一个请求时才会调用它。请注意,除非 Publisher从专用线程发出,上面的例子最终会拖拽 onNext调用到 executor的线程。

关于java - Flow API 中的 subscription.request(n) 如何在任意 n 值执行背压?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59555464/

相关文章:

haskell - 在 Haskell 中同时等待两个输入

ios - UITableView 自定义单元格,由 UIView 构建,可以并发吗?

java - 在 Java 6 中创建没有 lambda 表达式的 ReactiveX observable

rx-java - 实时计数 rx 中发出的元素

java - TableRow 权重缩放与我在 Android java 应用程序中使用 GraphView 时的预期相反

java - HTMLCLEANER 处理西类牙字符

java - jbpm 6 - 示例 Web 应用程序 "rewards-basic"错误

java - 如何使用uuid :randomUUID() in policy builder @Novell

java - 使用 Apache Spark 的 Hibernate 持久化导致进程阻塞

安卓 RxJava AlertDialog