java - Camel可以用Reactor代替吗,如果可以,那么如何正确完成呢?

标签 java apache-camel jms reactive-programming project-reactor

我想知道 Apache Camel 是否可以被 Reactor 或其他一些 react 流 (RS) 库取代。

我是 Reactor 和 RS 世界的新手,但似乎我设法准备了一些正在监听 JMS 消息的 Flux,尽管我不确定我的方法有多正确(这个项目帮助我做到了 - https://github.com/tonvanbart/gs-messaging-jms-reactive ) 。主要成分:

@Component
public class PublisherMessageListener {

    private final DefaultMessageListenerContainer jmsContainer;

    public PublisherMessageListener(DefaultMessageListenerContainer jmsContainer, EventProcessor processor) {
        this.jmsContainer = jmsContainer;
        publisher().subscribe(processor::process);
    }

    private Flux<String> publisher() {
        return Flux.from(subscriber -> {
            MessageListener listener = message -> {
                if (message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        subscriber.onNext(textMessage.getText());
                    } catch (Exception e) {
                        subscriber.onError(new RuntimeException(e));
                    }
                }
            };
            jmsContainer.setMessageListener(listener);
        });
    }
}

我使用 jmsTemplate.convertAndSend(DESTINATION_NAME, "{ 'type': 'CUSTOMER', 'id': 1 }") 发送消息,EventProcessor 处理每条消息.

react 部分正确吗?

我还想了解,是否可以在 Reactor 中完成重新传递的异常处理以及消息聚合。这些东西:

onException(RuntimeException.class)
    .maximumRedeliveries(2)
    .to(ERROR_URI);

from(URI)
    .aggregate(expression(this::getKey), AggregationStrategies.useLatest())
    .completionTimeout(DELAY_MILLIS)

我开始使用 Reactor,因为它将在 Spring 的某个时候出现。 Akka Streams 对我来说似乎更面向 Scala(不知道它的 Java 支持)。 Reactor 似乎也比 RxJava 更面向 Java 8,而且我喜欢 Reactor 的 Flux 和 Mono,而不是只有一个 Flowable。但如果您能提供很好的论据来解释为什么特定的库更好,我会很高兴听到它们。

最佳答案

从camel 2.11开始,有一个camel-rx模块可以迭代RxJava API。 http://camel.apache.org/rx.html

关于java - Camel可以用Reactor代替吗,如果可以,那么如何正确完成呢?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42960122/

相关文章:

java - 如何在云上使用tomcat 8运行java应用程序(war文件)? & 在哪里?

spring - 多个 Camel 路线处理同一个文件

spring - 在 Spring 中验证 JMS 负载

java - 关闭过程会导致input.readLine()中的CPU使用率很高

java - Hibernate JOIN 结果集

java - 使用 UCanAccess JDBC 驱动程序时格式化日期

java - 使用java对请求进行排队和批处理

java - Camel SpringBoot Rest servlet 连接被拒绝(camel 2.22.0)

java - 使用代理网络中的临时队列的请求/回复模式的 ActiveMQ/Camel 故障转移 - 无法发布到已删除的临时队列

java - 具有不同消息选择器的多个 DefaultMessageListenerContainer 实例