java - 如何将订阅者与reactor.core.publisher.Flux连接?

标签 java project-reactor

我想将订阅者与 Reactor Flux 连接起来。但是我的小例子不会产生任何输出:

public static void main(String[] args) throws InterruptedException {
    Flux.just("a", "b", "c")
            .subscribe(new BaseSubscriber<String>() {
                @Override
                protected void hookOnSubscribe(Subscription ignored) {
                }

                @Override
                protected void hookOnNext(String value) {
                    System.out.print(value);
                }
            });
    Thread.sleep(1000);
}

我尝试了同样的方法 org.reactivestreams.Subscriber :

    Flux.just("a", "b", "c")
            .subscribe(new Subscriber<String>() {
                @Override
                public void onSubscribe(Subscription s) {
                }

                @Override
                public void onNext(String s) {
                    System.out.println(s);
                }

                @Override
                public void onError(Throwable t) {
                }

                @Override
                public void onComplete() {
                }
            });

但同样,没有输出。

如果我尝试最简单的:

Flux.just("a", "b", "c").subscribe(System.out::println);

...它产生预期的输出:

a
b
c

如何使用订阅者从 Flux 接收值?

最佳答案

关键是从源中请求 n 个更多值。直到那个电话没有发生任何事情。

BaseSubscriber 示例:

Flux<String> source = Flux.just("a", "b", "c");
source.subscribe(new BaseSubscriber<String>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                request(1); // <-- here
            }

            @Override
            protected void hookOnNext(String value) {
                request(1); // <-- here
                System.out.println(value);
            }
        });

对于标准订阅者也是如此:

Flux<String> source = Flux.just("a", "b", "c");
source.subscribe(new Subscriber<String>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1); // <-- here
    }

    @Override
    public void onNext(String s) {
        subscription.request(1); // <-- here
        System.out.println(s);
    }

    @Override
    public void onError(Throwable t) {
    }

    @Override
    public void onComplete() {
    }
});

关于java - 如何将订阅者与reactor.core.publisher.Flux连接?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45550530/

相关文章:

java - 错误: variable might not have been initialised in my CellPhoneBill program

java - 在 java flux 中按对象属性分组

java - Spring WebFlux Reactor - 更新 Flux 中的对象

java - "Bridging"来自 gRPC StreamObserver 的 Reactor 的 Flux

java - java.util.concurrent.CompletableFuture 中的异常传播

java - 如何内联 MS Office 文档而不是作为附件提供服务?

java - 如何使用 Java 而不是 XML 将 hbase 与 Spring Boot 一起使用?

java - react 堆重试不调用要重试的方法

java - Spring Reactive - 重用Mono值

java - 使用 Ivy API 获取最新版本的依赖项?