reactive-programming - 从 react 流 SubmissionPublisher 接收项目

标签 reactive-programming java-9 reactive-streams

我正在尝试 Java 9 中的一些新功能。所以我进行了一个测试,以拥有一个发布者,以给定的速率发布数字。我还实现了一个订阅者来收听这些出版物并将它们打印到控制台。

虽然我可能不完全理解如何使用这个 Api,因为 onNext()方法不打印任何东西和getLastItem()只返回 0。

唯一似乎有效的部分是 onSubscribe()正确初始化 lastItem多变的。

@Test
public void testReactiveStreams(){
    //Create Publisher
    SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

    //Register Subscriber
    TestIntegerSubscriber subscriber = new TestIntegerSubscriber();
    publisher.subscribe(subscriber);

    assertTrue(publisher.hasSubscribers());

    //Publish items
    System.out.println("Publishing Items...");

    List.of(1,2,3,4,5).stream().forEach(i -> {
        publisher.submit(i);
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            // blah
        }
    });
    assertEquals(5, subscriber.getLastItem());

    publisher.close();
}


private class TestIntegerSubscriber implements Flow.Subscriber<Integer> {

    private int lastItem;
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println("Subscribed");
        lastItem = 0;
    }

    @Override
    public void onNext(Integer item) {
        System.out.println("Received : "+item);
        lastItem += 1; // expect increment by 1
        assertTrue(lastItem == item);
    }

    @Override
    public void onError(Throwable throwable) {
        // nothing for the moment
    }

    @Override
    public void onComplete() {
        System.out.println("Completed");
    }

    public int getLastItem(){
        return lastItem;
    }
}

有人可以告诉我我在测试中做错了什么吗?我希望测试能够打印这些数字并返回 5 作为最后一项。

我不得不说我只在 Angular2 中使用 Observables 和 Subjects,尽管它们似乎更容易理解。

最佳答案

Flow API 实现了一项称为背压 (explained here in the context of RxJava) 的功能,这意味着发布者不应该通过发布项目的速度超过其处理它们的速度来压倒订阅者。 JDK 9 实现这一点的方式是让订阅者从订阅中请求项目。

对于您的测试,TestIntegerSubscriber应该请求项目onSubscription ,假设为 10,并记录 onNext 的频率已被调用,因此一旦推送了 10 个项目,它就可以请求更多。

我写了a section about the Flow API更详细一点。它还描述了发布者、订阅者和订阅之间的交互:

  • 创建 PublisherSubscriber .
  • 通过 Publisher::subscribe 订阅订阅者.
  • 发布者创建 Subscription并调用Subscriber::onSubscription使用它,以便订阅者可以存储订阅。
  • 在某些时候,订户调用Subscription::request请求一些项目。
  • 发布者通过调用 Subscriber::onNext 开始将项目交给订阅者。 .它永远不会发布超过请求数量的项目。
  • 发布者可能在某个时候被耗尽或遇到麻烦并调用 Subscriber::onCompleteSubscriber::onError , 分别。
  • 订阅者可能会不时地继续请求更多的项目,或者通过调用 Subscription::cancel 来切断连接。 .
  • 关于reactive-programming - 从 react 流 SubmissionPublisher 接收项目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42794317/

    相关文章:

    ios - react cocoa 条件延迟

    macos - 无法在 Mac OS 终端上找到 jdk9

    java - 在 Java 9 中,是否可以使用 RMI 替代 HTTP 隧道?

    java - 如何获取 Flux 的最后一项而不用 reduce() 或 last() 折叠它

    javascript - 对象作为 React 子对象无效(找到 : object with keys)

    reactive-programming - 如何做响应式(Reactive)六边形建筑

    java - Eclipse 3.8(或 Juno)能否在 Java 9 上运行?

    java - RxJava Flowable 缓存到单一死锁

    java - 独眼巨人 react 和异步重试 : How to retry on timeout?

    javascript - Rx.Observable.groupBy 会清理空流吗?