c++ - AMQP-CPP : Broken pipe error in TCP Handler

标签 c++ events tcp rabbitmq amqp

不幸的是,在我的项目中,我总是在事件处理程序的 onError 函数中结束,并出现错误消息“Broken pipe”。不幸的是,我从来没有进入过 onConnected 状态。 Event Handler 中的 monitor Funktion 被 Flag AMQP::readable 调用了两次。之后,它在没有设置标志的情况下被调用,那是我的管道坏掉的时候。

这是我在代码中所做的。

首先我打开连接:

int Communicator_RabbitMQ::Open(string device)
{

    AMQP::Address address(AMQP::Address("amqp://test:test@localhost/"));

            // make a connection
    m_connection = std::make_shared< AMQP::TcpConnection> (&oCommunicator_RabbitMQ_Handler, address);


    // we need a channel too
    m_channel = std::make_shared <AMQP::TcpChannel> (m_connection.get());

    m_channel->declareExchange("my-exchange", AMQP::fanout);
    m_channel->declareQueue("my-queue");
    m_channel->bindQueue("my-exchange", "my-queue", "my-routing-key");


    m_channel->declareExchange("cyos_tx_exchange", AMQP::direct);
    m_channel->declareQueue("cyos_queue");
    m_channel->bindQueue("cyos_tx_exchange", "cyos_queue", "");


    return true;
}

然后我在我的线程中循环调用读取函数:

string Communicator_RabbitMQ::Read()
{

    int result = 0;
    int maxfd = 1;


    struct timeval tv
    {
        1, 0
    };

    string returnValue; //Rückgabe der Methode  
    string message;     // Nachricht aus RabbitMQ

    try
    {

        FD_ZERO(&oCommunicator_RabbitMQ_Handler.m_set);
        FD_SET(oCommunicator_RabbitMQ_Handler.m_fd, &oCommunicator_RabbitMQ_Handler.m_set);


        if (oCommunicator_RabbitMQ_Handler.m_fd != -1)
        {
            maxfd = oCommunicator_RabbitMQ_Handler.m_fd + 1;
        }

        result = select(FD_SETSIZE, &oCommunicator_RabbitMQ_Handler.m_set, NULL, NULL, &tv);

        if ((result == -1) && errno == EINTR)
        {
            TRACE(L"Error in socket");
        }
        else if (result > 0)
        {
            if (oCommunicator_RabbitMQ_Handler.m_flags & AMQP::readable)
                TRACE(L"Got something");

            if (FD_ISSET(oCommunicator_RabbitMQ_Handler.m_fd, &oCommunicator_RabbitMQ_Handler.m_set))
            {
                m_connection->process(oCommunicator_RabbitMQ_Handler.m_fd, oCommunicator_RabbitMQ_Handler.m_flags);

            }
        }

    }
    catch (exception e)
    {
        cout << e.what();
    }

    return "";

}

这是 TCP 事件处理程序:

#pragma once

class Communicator_RabbitMQ_Handler : public AMQP::TcpHandler
{
private:


    /**
    * Method that is called when the connection succeeded
    * @param socket Pointer to the socket
    */
    virtual void onConnected(AMQP::TcpConnection* connection)
    {
        std::cout << "connected" << std::endl;
    }

        /**
         *  When the connection ends up in an error state this method is called.
         *  This happens when data comes in that does not match the AMQP protocol
         *
         *  After this method is called, the connection no longer is in a valid
         *  state and can be used. In normal circumstances this method is not called.
         *
         *  @param  connection      The connection that entered the error state
         *  @param  message         Error message
         */
    virtual void onError(AMQP::TcpConnection* connection, const char* message)
    {
        // report error
        std::cout << "AMQP TCPConnection error: " << message << std::endl;
    }

        /**
         *  Method that is called when the connection was closed.
         *  @param  connection      The connection that was closed and that is now unusable
         */
    virtual void onClosed(AMQP::TcpConnection* connection)
    {
        std::cout << "closed" << std::endl;
    }


        /**
         *  Method that is called by AMQP-CPP to register a filedescriptor for readability or writability
         *  @param  connection  The TCP connection object that is reporting
         *  @param  fd          The filedescriptor to be monitored
         *  @param  flags       Should the object be monitored for readability or writability?
         */
    virtual void monitor(AMQP::TcpConnection* connection, int fd, int flags)
    {
        //TRACE(L"Communicator_RabbitMQ_Handler, monitor called, %d, %d, %x", fd, flags, &m_set);
        // we did not yet have this watcher - but that is ok if no filedescriptor was registered
        if (flags == 0) 
            return;


        if (flags & AMQP::readable)
        {
            FD_SET(fd, &m_set);
            m_fd = fd;
            m_flags = flags;
        }
    }



public:
    Communicator_RabbitMQ_Handler() = default;

    int m_fd = -1;
    int m_flags = 0;
    fd_set m_set;

};

RabbitMQ 日志条目:

2018-07-02 07:04:50.272 [info] <0.9653.0> accepting AMQP connection <0.9653.0> ([::1]:39602 -> [::1]:5672)
2018-07-02 07:04:50.273 [warning] <0.9653.0> closing AMQP connection <0.9653.0> ([::1]:39602 -> [::1]:5672):
{handshake_timeout,handshake}

最佳答案

我最终通过在 rabbitmq.config 文件中将 hanshake 超时增加到 20 秒解决了这个问题。我刚刚在该文件中添加了以下内容:

handshake_timeout = 20000

该值以毫秒为单位给出,默认值为 10 秒,这对我的解决方案来说似乎不够。

关于c++ - AMQP-CPP : Broken pipe error in TCP Handler,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51147754/

相关文章:

c++ - 简单 C++ 程序中的段错误

debugging - : "JDI Event Dispatch" java. lang.NullPointerException 期间发生内部错误

Java TCP 套接字编程 - 在服务器上构建大字符串以在客户端上打印

c - TCP 套接字上的 select() 仅适用于第一个循环

java - 从 java socket(TCP) 读取 19000 字节

c++ - 如何使用 enter(回车)终止无限循环

c# - C++中指针和C#中引用类型的区别

c# - 从表单处理类中的自定义事件

c++ - 在一行中初始化一对 vector

javascript - TouchEvent非法构造函数