c++ - ActiveMQ主从故障转移丢失消息

标签 c++ activemq activemq-cpp

ActiveMQ 在涉及故障转移时会丢失大量消息(仅在主题上)。 生产者在主题中写入 1000 条消息,而(同时)消费者正在读取同一主题。在这个过程的中间,我关闭了 ActiveMQ master,这个过程继续使用 ActiveMQ slave。进行转换后,会丢失大量消息(约 100 条消息)。我正在开发的产品涉及不丢失消息。 我可以做些什么来坚持主题? 制作人:

#include <activemq\library\ActiveMQCPP.h>
#include <cms\Connection.h>
#include <cms\Session.h>
#include <activemq\core\ActiveMQConnection.h>
#include <activemq\core\ActiveMQConnectionFactory.h>
#include <activemq\core\ActiveMQSession.h>
#include <activemq\core\ActiveMQConsumer.h>
#include <activemq\core\ActiveMQQueueBrowser.h>
#include <activemq\core\PrefetchPolicy.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
#include <activemq\util\ActiveMQProperties.h>
#include <activemq\util\Config.h>
#include <decaf\util\Properties.h>
#include <cms\Queue.h>
#include <cms\Topic.h>
#include <cms\MapMessage.h>
#include <cms\MessageEnumeration.h>
#include <cms\CMSException.h>
#include <activemq\commands\Command.h>
#include <decaf\lang\Pointer.h>
#include <activemq\transport\Transport.h>
#include <activemq\transport\TransportFilter.h>
#include <activemq\commands\SessionInfo.h>
#include <activemq\commands\ActiveMQMessage.h>
#include <activemq\commands\BrokerInfo.h>

#include <activemq\exceptions\ConnectionFailedException.h>
#include <string>
#include <vector>
#include <iostream>
#include <fstream>
#include <decaf\lang\Throwable.h>

std::string _amqURI("failover:(tcp://host1:61616,tcp://host2:61616)?initialReconnectDelay=5000&maxReconnectAttempts=2");
const std::string _username("user");
const std::string _password("pass");
const std::string _host("localhost");
const std::string _destination("Test.AMQ.bogcretu.Topic");

std::string _garbageMessage("GARBAGE0_GARBAGE1_GARBAGE2_GARBAGE3_GARBAGE4_GARBAGE5_GARBAGE6_GARBAGE7_GARBAGE8_GARBAGE9");
int _countMessages = 1000;
int _multiplyFactor = 100;
std::string _bodyMessage = "";

void CreateMessage()
{
    for (int i = 0; i < _multiplyFactor; i++) {
        _bodyMessage += _garbageMessage;
    }
}

int main()
{
    activemq::library::ActiveMQCPP::initializeLibrary();
    CreateMessage();
    activemq::core::ActiveMQConnectionFactory factory;
    factory.setBrokerURI(_amqURI);
    std::auto_ptr<cms::TextMessage> message;
    std::auto_ptr<cms::Connection> connection(factory.createConnection(_username, _password));

    connection->start();

    std::auto_ptr<cms::Session> session(connection->createSession());
    std::auto_ptr<cms::Destination> destionation(session->createTopic(_destination));
    std::auto_ptr<cms::MessageProducer> producer(session->createProducer(destionation.get()));

    producer->setDeliveryMode(cms::DeliveryMode::PERSISTENT);

    for (int i = 0; i < _countMessages; i++) {
        std::stringstream ss;
        ss << i;
        std::string number = ss.str();
        message.reset(session->createTextMessage(number));
        producer->send(message.get());
        std::cout << i << std::endl;
    }

    //message.reset(session->createTextMessage("DONE"));
    //producer->send(message.get());

    //connection->close();

    //activemq::library::ActiveMQCPP::shutdownLibrary();

    return 0;
}

消费者:

#include <activemq\library\ActiveMQCPP.h>
#include <cms\Connection.h>
#include <cms\Session.h>
#include <activemq\core\ActiveMQConnection.h>
#include <activemq\core\ActiveMQConnectionFactory.h>
#include <activemq\core\ActiveMQSession.h>
#include <activemq\core\ActiveMQConsumer.h>
#include <activemq\core\ActiveMQQueueBrowser.h>
#include <activemq\core\PrefetchPolicy.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
#include <activemq\util\ActiveMQProperties.h>
#include <activemq\util\Config.h>
#include <decaf\util\Properties.h>
#include <cms\Queue.h>
#include <cms\Topic.h>
#include <cms\MapMessage.h>
#include <cms\MessageEnumeration.h>
#include <cms\CMSException.h>
#include <cms\MessageListener.h>
#include <cms\Message.h>
#include <activemq\commands\Command.h>
#include <decaf\lang\Pointer.h>
#include <activemq\transport\Transport.h>
#include <activemq\transport\TransportFilter.h>
#include <activemq\commands\SessionInfo.h>
#include <activemq\commands\ActiveMQMessage.h>
#include <activemq\commands\BrokerInfo.h>

#include <activemq\exceptions\ConnectionFailedException.h>
#include <string>
#include <vector>
#include <iostream>
#include <chrono>
#include <thread>
#include <decaf\lang\Throwable.h>

std::string amqURI("failover:(tcp://host1:61616,tcp://host2:61616)?initialReconnectDelay=5000&maxReconnectAttempts=2");

class MsgListener : public cms::MessageListener
{
public:
    std::string _amqURI;
    cms::Connection *_connection;
    cms::Session* _session;
    cms::Destination* _destination;
    cms::MessageConsumer* _consumer;
    bool _sessionTransacted;
    bool _useTopic;

    MsgListener(std::string amqURI, bool sessionTransacted, bool useTopic = false) : _amqURI(amqURI), _sessionTransacted(sessionTransacted), _useTopic(useTopic), _connection(NULL), _session(NULL), _destination(NULL), _consumer(NULL)
    {
        this->_connection = cms::ConnectionFactory::createCMSConnectionFactory(this->_amqURI)->createConnection();
        this->_connection->start();

        /*if (this->_sessionTransacted == true) {
            this->_session = this->_connection->createSession(cms::Session::SESSION_TRANSACTED);
        }
        else {
            this->_session = this->_connection->createSession(cms::Session::AUTO_ACKNOWLEDGE);
        }*/

        this->_session = this->_connection->createSession(cms::Session::DUPS_OK_ACKNOWLEDGE);

        if (useTopic) {
            this->_destination = this->_session->createTopic("Test.AMQ.bogcretu.Topic");
        }
        else {
            this->_destination = this->_session->createQueue("Test.AMQ.bogcretu.Topic");
        }

        this->_consumer = this->_session->createConsumer(this->_destination);
        this->_consumer->setMessageListener(this);

        /*std::cout.flush();
        std::cerr.flush();*/


    }

    ~MsgListener()
    {

    }

    void onMessage(const cms::Message* CMSMessage)
    {
        static int count = 0;

        try
        {

            const cms::TextMessage* textMessage = dynamic_cast<const cms::TextMessage*> (CMSMessage);
            std::string text = "";
            if (textMessage != NULL) {
                text = textMessage->getText();
            }
            else {
                text = "NOT A TEXTMESSAGE!";

            }

            std::cout << "(" << count << ", " << text << ")" << std::endl;
            count++;

        }
        catch (cms::CMSException& e)
        {
            e.printStackTrace();
        }

        if (this->_sessionTransacted) {
            this->_session->commit();
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
};

int main()
{
    activemq::library::ActiveMQCPP::initializeLibrary();
    MsgListener consumer(amqURI, true, true);
    while (true);
    //activemq::library::ActiveMQCPP::shutdownLibrary();
}

Consumer_durable:

#include <activemq\library\ActiveMQCPP.h>
#include <cms\Connection.h>
#include <cms\Session.h>
#include <activemq\core\ActiveMQConnection.h>
#include <activemq\core\ActiveMQConnectionFactory.h>
#include <activemq\core\ActiveMQSession.h>
#include <activemq\core\ActiveMQConsumer.h>
#include <activemq\core\ActiveMQQueueBrowser.h>
#include <activemq\core\PrefetchPolicy.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
#include <activemq\util\ActiveMQProperties.h>
#include <activemq\util\Config.h>
#include <decaf\util\Properties.h>
#include <cms\Queue.h>
#include <cms\Topic.h>
#include <cms\MapMessage.h>
#include <cms\MessageEnumeration.h>
#include <cms\CMSException.h>
#include <cms\MessageListener.h>
#include <cms\Message.h>
#include <activemq\commands\Command.h>
#include <decaf\lang\Pointer.h>
#include <activemq\transport\Transport.h>
#include <activemq\transport\TransportFilter.h>
#include <activemq\commands\SessionInfo.h>
#include <activemq\commands\ActiveMQMessage.h>
#include <activemq\commands\BrokerInfo.h>

#include <activemq\exceptions\ConnectionFailedException.h>
#include <string>
#include <vector>
#include <iostream>
#include <chrono>
#include <thread>
#include <decaf\lang\Throwable.h>

std::string amqURI("failover:(tcp://host1:61616,tcp://host2:61616)?initialReconnectDelay=5000&maxReconnectAttempts=2");

class MsgListener : public cms::MessageListener
{
public:
    std::string _amqURI;
    cms::Connection *_connection;
    cms::Session* _session;
    cms::Destination* _destination;
    cms::MessageConsumer* _consumer;
    bool _sessionTransacted;
    bool _useTopic;

    MsgListener(std::string amqURI, bool sessionTransacted, bool useTopic = false) : _amqURI(amqURI), _sessionTransacted(sessionTransacted), _useTopic(useTopic), _connection(NULL), _session(NULL), _destination(NULL), _consumer(NULL)
    {
        this->_connection = cms::ConnectionFactory::createCMSConnectionFactory(this->_amqURI)->createConnection();
        this->_connection->start();

        /*if (this->_sessionTransacted == true) {
            this->_session = this->_connection->createSession(cms::Session::SESSION_TRANSACTED);
        }
        else {
            this->_session = this->_connection->createSession(cms::Session::AUTO_ACKNOWLEDGE);
        }*/

        this->_session = this->_connection->createSession(cms::Session::DUPS_OK_ACKNOWLEDGE);

        if (useTopic) {
            this->_destination = this->_session->createTopic("Test.AMQ.bogcretu.Topic");
        }
        else {
            this->_destination = this->_session->createQueue("Test.AMQ.bogcretu.Topic");
        }

        //this->_consumer = this->_session->createConsumer(this->_destination);



        static const cms::Topic * topic = dynamic_cast<const cms::Topic*>(this->_destination);
        this->_consumer = this->_session->createDurableConsumer(topic, "sub_name", "");
        this->_consumer->setMessageListener(this);

        /*std::cout.flush();
        std::cerr.flush();*/


    }

    ~MsgListener()
    {

    }

    void onMessage(const cms::Message* CMSMessage)
    {
        static int count = 0;

        try
        {

            const cms::TextMessage* textMessage = dynamic_cast<const cms::TextMessage*> (CMSMessage);
            std::string text = "";
            if (textMessage != NULL) {
                text = textMessage->getText();
            }
            else {
                text = "NOT A TEXTMESSAGE!";
            }

            std::cout << "(" << count << ", " << text << ")" << std::endl;
            count++;

        }
        catch (cms::CMSException& e)
        {
            e.printStackTrace();
        }

        if (this->_sessionTransacted) {
            this->_session->commit();
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
};

int main()
{
    activemq::library::ActiveMQCPP::initializeLibrary();
    MsgListener consumer(amqURI, true, true);
    while (true);
    //activemq::library::ActiveMQCPP::shutdownLibrary();
}

最佳答案

如果你想要消息持久化,那么你应该使用队列,或者使用持久主题订阅。无论生产者的持久模式如何,主题本身都不会持久化消息,事实上,如果没有消费者订阅并且您发送到主题,则消息将被丢弃,同样用于控制主题的常量挂起消息限制的 ActiveMQ 配置将丢弃消费者无法跟上的主题上的旧消息,因为主题的服务保证级别较低。

您需要使用 Queue 并在生产者上设置持久性,或者确保您有一个预先存在的持久主题订阅并使用分配了持久性标志的生产者发送消息,如果您希望将消息写入存储和在代理故障转移时恢复。

关于c++ - ActiveMQ主从故障转移丢失消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58731527/

相关文章:

c++ - 如何根据动态属性对项目进行排序?

c++ - STL算法类

java - Apache Camel-消息重新传递发生在异常 block 执行之前

C# 'String' 到 C++ 'std::string'

c++ - 对 std::string 使用 operator [] 是否安全

ActiveMQ 和 maxPageSize

java - 从 Camel 路由发送响应

ssl - activemq-cpp c++​​ 客户端如何使用 ssl url 连接服务器

c++ - HornetQ,消费者找不到队列