我正在尝试 Java 9 中的一些新功能。所以我进行了一个测试,以拥有一个发布者,以给定的速率发布数字。我还实现了一个订阅者来收听这些出版物并将它们打印到控制台。
虽然我可能不完全理解如何使用这个 Api,因为 onNext()
方法不打印任何东西和getLastItem()
只返回 0。
唯一似乎有效的部分是 onSubscribe()
正确初始化 lastItem
多变的。
@Test
public void testReactiveStreams(){
//Create Publisher
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
//Register Subscriber
TestIntegerSubscriber subscriber = new TestIntegerSubscriber();
publisher.subscribe(subscriber);
assertTrue(publisher.hasSubscribers());
//Publish items
System.out.println("Publishing Items...");
List.of(1,2,3,4,5).stream().forEach(i -> {
publisher.submit(i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// blah
}
});
assertEquals(5, subscriber.getLastItem());
publisher.close();
}
private class TestIntegerSubscriber implements Flow.Subscriber<Integer> {
private int lastItem;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("Subscribed");
lastItem = 0;
}
@Override
public void onNext(Integer item) {
System.out.println("Received : "+item);
lastItem += 1; // expect increment by 1
assertTrue(lastItem == item);
}
@Override
public void onError(Throwable throwable) {
// nothing for the moment
}
@Override
public void onComplete() {
System.out.println("Completed");
}
public int getLastItem(){
return lastItem;
}
}
有人可以告诉我我在测试中做错了什么吗?我希望测试能够打印这些数字并返回 5 作为最后一项。
我不得不说我只在 Angular2 中使用 Observables 和 Subjects,尽管它们似乎更容易理解。
最佳答案
Flow API 实现了一项称为背压 (explained here in the context of RxJava) 的功能,这意味着发布者不应该通过发布项目的速度超过其处理它们的速度来压倒订阅者。 JDK 9 实现这一点的方式是让订阅者从订阅中请求项目。
对于您的测试,TestIntegerSubscriber
应该请求项目onSubscription
,假设为 10,并记录 onNext
的频率已被调用,因此一旦推送了 10 个项目,它就可以请求更多。
我写了a section about the Flow API更详细一点。它还描述了发布者、订阅者和订阅之间的交互:
Publisher
和 Subscriber
. Publisher::subscribe
订阅订阅者. Subscription
并调用Subscriber::onSubscription
使用它,以便订阅者可以存储订阅。 Subscription::request
请求一些项目。 Subscriber::onNext
开始将项目交给订阅者。 .它永远不会发布超过请求数量的项目。 Subscriber::onComplete
或 Subscriber::onError
, 分别。 Subscription::cancel
来切断连接。 . 关于reactive-programming - 从 react 流 SubmissionPublisher 接收项目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42794317/