java - 提交方法不会调用 onNext FLOW STREAM API JAVA

标签 java reactive-programming publish-subscribe java-9

我正在学习 Java 中的 FLOW Stream API,我目前正在创建一个基于 oracle community 上的示例的示例.问题是我看不到预期的输出,而只看到打印在 onSubscribe 方法中的 SUBSCRIBING 字符串。我已经检查并找到了submissionpublisher-on-submit-not-invoking-onnext-of-subscriber在 StackOverflow 上,但没有用,因为我已经在调用 request(Long N)

import java.util.concurrent.Flow;

public class Computer<T> implements Flow.Subscriber<T> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        System.out.println("SUBSCRIBING");
        this.subscription.request(1);
    }

    @Override
    public void onNext(T item) {
        System.out.println(String.format("Got %s", item.toString()));
        this.subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        throwable.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("DONE");
    }

}

--

import java.util.List;
import java.util.concurrent.SubmissionPublisher;

public class Sensor {

    public static void main(String[] args) {
        SubmissionPublisher<String> submissionPublisher = new SubmissionPublisher<>();
        Computer<String> subscriber = new Computer<>();
        submissionPublisher.subscribe(subscriber);

        List<String> items = List.of("1.25", "1.224", "1.55");
        items.forEach(submissionPublisher::submit);
        submissionPublisher.close();
    }

}

我刚刚看到:

SUBSCRIBING

为什么 onNext 方法没有被调用?

最佳答案

您没有将 ScheduledExecutorService 传递给 Publisher,它基本上是一个 ExecutorService,它可以安排任务在延迟后运行或以固定的时间间隔重复执行在每次执行之间。

import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SubmissionPublisher;

public class Sensor {

    public static void main(String[] args) {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
        SubmissionPublisher<String> submissionPublisher = new SubmissionPublisher<>(executor, 5);
        Computer<String> subscriber = new Computer<>();
        submissionPublisher.subscribe(subscriber);

        List<String> items = List.of("1.25", "1.224", "1.55");
        items.forEach(submissionPublisher::submit);
        submissionPublisher.close();
        executor.shutdown();
    }
}

关于java - 提交方法不会调用 onNext FLOW STREAM API JAVA,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58193307/

相关文章:

java - 如何在 Java 中创建没有指定长度的字符数组?

javascript - 发布更改时防止 Meteor 删除已发送到客户端的 MiniMongo 数据

scala - Akka DistributedPubSubMediator at-least-once delivery guarantees for publishing to a topic

python - Django/gevent socket.IO 与 redis pubsub。我把东西放在哪里?

java - 如何在 webspec 上禁用 javascript?

java - 当所有任务完成时,执行器服务如何发出信号?

javascript - 如何在rxjs中获取 throttle 值?

asp.net - 直接从 Angular 客户端订阅 Azure 事件网格主题

java - 约翰逊特罗特算法

rx-java - 实时计数 rx 中发出的元素