publish-subscribe - 为什么订阅者在不同情况下请求不同数量的元素?

标签 publish-subscribe spring-webflux publisher reactive-streams

我正在学习响应式(Reactive)流和发布-订阅实用程序,并且我正在使用发布者(在我的例子中为 Flux)和订阅者的默认行为。

我有两个场景,两者在 Flux 中具有相同数量的元素。但是当我分析日志时,onSubscribe 方法请求不同数量的元素(比如在一种情况下请求无界元素,在另一种情况下请求 32 个元素)。

以下是两个案例和日志:

        System.out.println("*********Calling MapData************");
        List<Integer> elements = new ArrayList<>();
        Flux.just(1, 2, 3, 4)
          .log()
          .map(i -> i * 2)
          .subscribe(elements::add);
        //printElements(elements);
        System.out.println("-------------------------------------");

        System.out.println("Inside Combine Streams");
        List<Integer> elems = new ArrayList<>();
        Flux.just(10,20,30,40)
            .log()
            .map(x -> x * 2)
            .zipWith(Flux.range(0, Integer.MAX_VALUE),
                (two, one) -> String.format("First  : %d, Second : %d \n", one, two))
            .subscribe(new Consumer<String>() {
              @Override
              public void accept(String s) {

              }
            });
        System.out.println("-------------------------------------");

这是日志:

*********Calling MapData************
[warn] LoggerFactory has not been explicitly initialized. Default system-logger will be used. Please invoke StaticLoggerBinder#setLog(org.apache.maven.plugin.logging.Log) with Mojo's Log instance at the early start of your Mojo
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(unbounded)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------
Inside Combine Streams
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(32)
[info] | onNext(10)
[info] | onNext(20)
[info] | onNext(30)
[info] | onNext(40)
[info] | onComplete()
[info] | cancel()
-------------------------------------

由于我没有使用任何自定义订阅者实现,那么为什么在“MapData”情况下,它会记录“[info] | request(unbounded)”并在““内部组合流””中如果正在记录“[info] | request(32)”

请提出建议。

最佳答案

首先,您应该知道这是预期的行为。

根据您使用的运算符,Reactor 将应用不同的预取策略:

  • 某些运算符将使用默认值,例如 32256
  • 如果您添加了具有特定值的缓冲运算符,某些安排将使用您提供的值
  • Reactor 可以猜测值流是有限的,并将请求无界值

如果您将运算符变体与 int prefetch 方法参数一起使用,或者如果您使用 BaseSubscriber 实现自己的 Subscriber,则始终可以更改此行为>(为此提供了几种有用的方法)。

最重要的是,您通常不需要关注该特定值;您只需关注该值即可。仅当您想针对特定数据源优化预取策略时,它才有用。

关于publish-subscribe - 为什么订阅者在不同情况下请求不同数量的元素?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48019281/

相关文章:

mongodb - Meteor:我应该如何更新用户集合以在对象/字典中包含一个新属性?

publish-subscribe - 从 DDS 中删除已读主题

spring - 如何使用 Spring WebClient 同时进行多个调用?

reactive-programming - 使用项目 react 器将验证包括到 react 链中

multithreading - ZeroMQ Pub/Sub 仅在订阅主题时丢弃消息

node.js - 使用 Redis 模式订阅

java - Spring WebFlux WebClient - 如何解决 400 错误请求

html - 自定义字体(字体)在发布时不起作用

swift - 如何在 SwiftUI/Combine 中将订阅者附加到状态或绑定(bind)?

Swift结合从func返回发布者