spring - 嵌入式代理和故障转移消费者的 Activemq 集群

标签 spring activemq cluster-computing

环境

  • 现有的软件具有专有的集群方式,应将其移至使用 JMS
  • 客户不想为设置和维护 消息系统,所以只有当我能嵌入整个系统时才能使用它 向现有虚拟机发送消息
  • 代理实例和消费者应该在同一个 JVM 中。消费者 应该能够在故障转移情况下连接到远程代理,因为所有消费者无论在哪个 JVM 中运行,都应该有一个输入队列。
  • 如果消费者使用直接方法调用 与本地经纪人沟通

演示项目

我用 ActiveMQ + Maven + Spring 创建了一个非常简单的演示 (eclipse) 项目(整个项目位于 http://www.woofiles.com/dl-279452-fOcsWkcm-activemq.zip)。如果您尝试这样做,请更改 activemq 的 dataDirectory,因为到目前为止它是一个有线绝对路径。

我尝试从 spring 启动一个代理,以及一组消费者。请参阅下面的 Spring 配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:p="http://www.springframework.org/schema/p" xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task-3.0.xsd">

    <bean id="propertyConfigurer"
        class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_FALLBACK" />
    </bean>
        <bean id="embeddedBroker" class="org.apache.activemq.broker.BrokerService"
            destroy-method="stop" init-method="start" >
            <property name="brokerName" value="conversion" />
            <property name="dataDirectory"
                value="c:\eclipseWithMaven\activemq\working\conversion" />
            <property name="schedulerSupport" value="false" />
            <property name="transportConnectorURIs">
                <list>
                    <value>tcp://127.0.0.1:600${idOfClusterNode}</value>
                </list>
            </property>
            <property name="managementContext">
                <bean class="org.apache.activemq.broker.jmx.ManagementContext">
                    <property name="connectorPort" value="201${idOfClusterNode}" />
                </bean>
            </property>
        </bean>

    <!-- depends-on see why at http://activemq.apache.org/vm-transport-reference.html -->
    <!--depends-on="embeddedBroker" -->
    <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="embeddedBroker">
        <property name="brokerURL">
            <value>failover:(vm:/conversion,tcp://127.0.0.1:6001,tcp://127.0.0.1:6002)</value>
        </property>
    </bean>
    <bean id="cachedConnectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory"
        p:targetConnectionFactory-ref="jmsFactory" p:sessionCacheSize="10" />
    <bean id="container"
        class="org.springframework.jms.listener.SimpleMessageListenerContainer">
        <property name="concurrentConsumers" value="10" />
        <property name="connectionFactory" ref="cachedConnectionFactory" />
        <property name="messageListener" ref="conversion" />
        <property name="destination" ref="conversionInputQueue" />
    </bean>
    <bean id="conversionInputQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="conversionInputQueue" />
    </bean>
    <bean id="conversion" class="activemq.Conversion"
        p:clusterId="${idOfClusterNode}" />
</beans>

我只是尝试使用 spring 和 log4j 配置使用的不同参数启动 activemq.ConversionDemo 类的一个或两个实例。运行配置的环境条目如下所示:

  • 实例 1:-DidOfClusterNode=1 -DidOfOtherClusterNode=2 -DlogFile=conversion1.log
  • 实例 2:-DidOfClusterNode=2 -DidOfOtherClusterNode=1 -DlogFile=conversion2.log

如果我启动一个实例,就没问题。如果两个运行出现以下问题:

  • 第二个代理根本不会启动。它说它没有锁。很好,但我想,它只是异步启动一个线程,并将控制权交还给 spring。但似乎,它不会让 Spring 继续下去。
  • SimpleMessageListenerContainer 似乎也控制着控制权 整个 Spring ,直到所有消费者都启动。

我想要什么

  • 我想满足以上条件
  • 我想我必须异步启动代理和消费者, 我真的不能用这个配置在 Spring 做
  • 在代理之间实现真正的负载平衡会很好。看来,ActiveMQ 只为故障转移做好了准备。
  • 如果ActiveMQ不能满足我的需求,请推荐 其他免费解决方案。

如果您需要更多信息,请告诉我。

编辑

我认为 ActiveMQ 支持我的需求,我只需要了解“网络代理”。所以我想我必须有两个文件存储和一个来 self 的两个代理的网络。

最佳答案

如果您将 2 个代理指向同一个文件存储,那么第一个将获得锁,第二个将阻塞直到锁被释放...这是一个 shared file system master/slave设置

如果你想要一个主动/主动设置,那么你需要使用单独的文件存储并将它们连接在一起作为 network of brokers

关于spring - 嵌入式代理和故障转移消费者的 Activemq 集群,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8923001/

相关文章:

java - Spring jpa hibernate : read and write data in atomic way

java - 如何从ActiveMQConnectionFactory获取ActiveMQConnection?

java - ActiveMQ命令行异常: "Failed to retrieve RMIServer stub"

Hadoop:为什么我在 Namenode Information Web 中收到 "Max Non Heap Memory is -1 B."消息?这是什么意思?

java - 如何从外部 Controller 运行 Controller 内的方法

java - Spring自定义路径解析静态资源

java - 无法从外部项目获取bean

java - Wildfly activemq 集成 - 消息子系统

apache-spark - 跨集群分布分区

scala - 提交到 Spark 集群时出现 FileNotFoundException