java - 如何正确使用 Reactor Publisher

标签 java spring reactor project-reactor

我不知道如何使用 Reactor 正确实现发布者/订阅者场景。我有一个可行的解决方案,但对我来说,实现似乎不正确:

我的问题是我需要手动实现发布者来注册订阅者并传递事件:

public void publishQuotes(String ticker) throws InterruptedException {

// [..] Here I generate some "lines" to be publisher

for (Subscriber<? super String> subscriber : subscribers) {
    lineList.forEach(line -> subscriber.onNext(line));
}

}

@Override
public void subscribe(Subscriber<? super String> subscriber) {
    subscribers.add(subscriber);

}

然后,我有一个 WorkQueue 处理器(应该是消费者):

WorkQueueProcessor<String> sink = WorkQueueProcessor.create();

// Here I subscribe to my publiser
publisher.subscribe(sink);

// Creates a Reactive Stream from the processor (having converted the lines to Quotations)
Flux<StockQuotation> mappedRS = sink.map(quotationConverter::convertHistoricalCSVToStockQuotation);

// Here I perform a number of stream transformations 

// Each call to consume will be executed in a separated Thread
filteredRS.consume(i -> System.out.println(Thread.currentThread() + " data=" + i));

它工作正常,但很难看。 In this example取自 Spring Guides,他们使用 EventBus 将事件从发布者路由到消费者,但是,当我尝试将其与我的处理器链接时,我收到以下编译器错误:

eventBus.on($("quotes"),sink);

The method on(Selector, Consumer<T>) in the type EventBus is not applicable for the arguments (Selector<String>, WorkQueueProcessor<String>)

我在这里迷失了,将发布商与处理者联系起来的最佳方式是什么?您会推荐使用 EventBus 吗?如果是这样,正确的调用是什么?

谢谢!

最佳答案

如果您使用 EventBus,您将通过以下方式发布您的台词

 eventBus.notify("quotes", Event.wrap(line);

并通过订阅

eventBus.on($("quotes"), e -> System.out.println(Thread.currentThread() + " data=" + e);

其中“e”是事件类型

关于java - 如何正确使用 Reactor Publisher,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35960032/

相关文章:

Java:如何为 double 值设置精度?

java - 使用 bouncy caSTLe 生成 dsa/elgamal key 对,在 GPG 中导入时不会出错

java - Spring 中的服务器发送事件客户端示例

java - 如何在java spring中运行预定的作业?

python - 将收到的数据(从 Twisted)写入 tkinter 文本框

react 堆测试失败

python - 如何在按下 Ctrl+c 时以干净的方式杀死 python 程序(包括 ROS 和扭曲协议(protocol))?

java - 单击鼠标交换按钮图标

java - 获取一个数组,判断该数字是否更接近右/左零

java - 从intellij上的tomcat获取错误404