rabbitmq - RabbitMQ 服务器无法访问后,EasyNetQ 不会重新连接

标签 rabbitmq easynetq

我正在我的软件中尝试一些边缘情况。所以我创建了一个非常简单的测试环境:

  • 在 CentOS 7 上运行的 RabbitMQ 服务器
  • 在 CentOS 7 下运行的 C# 中针对 .NETCore 2.1 编写的消息消费者
  • 在 CentOS 7 下运行的 C# 中针对 .NETCore 2.1 编写的消息发送器

我每 5 秒发送一条简单的短信。发送方和接收方运行在同一个 UNIX 机器上,而 RabbitMQ 服务器运行在网络中不同的机器上。 到目前为止,一切都很好。现在,我使用 systemctl stop rabbitmq-server 停止我的 RabbitMQ 服务器。

我收到有关发送方和接收方的错误,这是预期的。

我使用 systemctl start rabbitmq-server 重启了 RabbitMQ 服务器。

现在好戏开始了!发件人可以恢复并继续发送消息但消费者无法恢复并且不会收到消息。它们在 RabbitMQ 服务器上累积!

这是我的来自发件人的日志条目(按预期工作):

2019-01-22 21:18:25.628 +01:00 [ERR] [EasyNetQ.PersistentConnection] [ThreadId 10] Failed to connect to broker infraserver-tbws2, port 5672, vhost testvh
RabbitMQ.Client.Exceptions.BrokerUnreachableException: None of the specified endpoints were reachable ---> System.AggregateException: One or more errors occurred. (Connection failed) ---> RabbitMQ.Client.Exceptions.ConnectFailureException: Connection failed ---> System.Net.Internals.SocketExceptionFactory+ExtendedSocketException: Connection refused 172.16.63.239:5672
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw(Exception source)
   at System.Net.Sockets.Socket.EndConnect(IAsyncResult asyncResult)
   at System.Net.Sockets.Socket.<>c.<ConnectAsync>b__272_0(IAsyncResult iar)
--- End of stack trace from previous location where exception was thrown ---
   at RabbitMQ.Client.TcpClientAdapter.ConnectAsync(String host, Int32 port)
   at RabbitMQ.Client.Impl.TaskExtensions.TimeoutAfter(Task task, Int32 millisecondsTimeout)
   at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, Int32 timeout)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingAddressFamily(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 timeout, AddressFamily family)
   at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingIPv4(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 timeout)
   at RabbitMQ.Client.Impl.SocketFrameHandler..ctor(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 connectionTimeout, Int32 readTimeout, Int32 writeTimeout)
   at RabbitMQ.Client.Framing.Impl.IProtocolExtensions.CreateFrameHandler(IProtocol protocol, AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 connectionTimeout, Int32 readTimeout, Int32 writeTimeout)
   at RabbitMQ.Client.ConnectionFactory.CreateFrameHandler(AmqpTcpEndpoint endpoint)
   at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(String clientProvidedName)
   at EasyNetQ.ConnectionFactoryWrapper.CreateConnection()
   at EasyNetQ.PersistentConnection.TryToConnect()
2019-01-22 21:18:25.632 +01:00 [ERR] [EasyNetQ.PersistentConnection] [ThreadId 10] Failed to connect to any Broker. Retrying in 00:00:05
2019-01-22 21:18:35.444 +01:00 [INF] [] [ThreadId 1] Sucessfully sent Message 'Message: 'This is test message number 7.' | Num: 7 | Guid: 5345c7e4-61e6-4c79-8179-d4bef7864420'.
2019-01-22 21:18:40.452 +01:00 [INF] [] [ThreadId 1] Sucessfully sent Message 'Message: 'This is test message number 8.' | Num: 8 | Guid: 3cd8635c-cdfa-45f3-8495-2acb0713d47b'.
2019-01-22 21:18:45.457 +01:00 [INF] [] [ThreadId 1] Sucessfully sent Message 'Message: 'This is test message number 9.' | Num: 9 | Guid: 099462b8-cd66-40b9-ac10-89c3246819ec'.
2019-01-22 21:18:50.470 +01:00 [INF] [] [ThreadId 1] Sucessfully sent Message 'Message: 'This is test message number 10.' | Num: 10 | Guid: c25139b2-8e45-4771-9544-830014382e0c'.
2019-01-22 21:18:55.515 +01:00 [INF] [] [ThreadId 1] Sucessfully sent Message 'Message: 'This is test message number 11.' | Num: 11 | Guid: 90049d91-3805-4aaa-ac18-b61c09164afd'.
2019-01-22 21:19:00.526 +01:00 [INF] [] [ThreadId 1] Sucessfully sent Message 'Message: 'This is test message number 12.' | Num: 12 | Guid: 108ff318-6a34-4e64-94bd-dafa67aa6717'.

这显示了最后一条错误消息,然后可以看到 EasyNetQ 已恢复并可以再次传递消息。

消息消费者不工作!这是我的日志条目:

2019-01-22 21:18:25.623 +01:00 [ERR] [EasyNetQ.PersistentConnection] [ThreadId 12] Failed to connect to broker infraserver-tbws2, port 5672, vhost testvh
RabbitMQ.Client.Exceptions.BrokerUnreachableException: None of the specified endpoints were reachable ---> System.AggregateException: One or more errors occurred. (Connection failed) ---> RabbitMQ.Client.Exceptions.ConnectFailureException: Connection failed ---> System.Net.Internals.SocketExceptionFactory+ExtendedSocketException: Connection refused 172.16.63.239:5672
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw(Exception source)
   at System.Net.Sockets.Socket.EndConnect(IAsyncResult asyncResult)
   at System.Net.Sockets.Socket.<>c.<ConnectAsync>b__272_0(IAsyncResult iar)
--- End of stack trace from previous location where exception was thrown ---
   at RabbitMQ.Client.TcpClientAdapter.ConnectAsync(String host, Int32 port)
   at RabbitMQ.Client.Impl.TaskExtensions.TimeoutAfter(Task task, Int32 millisecondsTimeout)
   at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, Int32 timeout)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingAddressFamily(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 timeout, AddressFamily family)
   at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingIPv4(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 timeout)
   at RabbitMQ.Client.Impl.SocketFrameHandler..ctor(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 connectionTimeout, Int32 readTimeout, Int32 writeTimeout)
   at RabbitMQ.Client.Framing.Impl.IProtocolExtensions.CreateFrameHandler(IProtocol protocol, AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 connectionTimeout, Int32 readTimeout, Int32 writeTimeout)
   at RabbitMQ.Client.ConnectionFactory.CreateFrameHandler(AmqpTcpEndpoint endpoint)
   at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(String clientProvidedName)
   at EasyNetQ.ConnectionFactoryWrapper.CreateConnection()
   at EasyNetQ.PersistentConnection.TryToConnect()
2019-01-22 21:18:25.625 +01:00 [ERR] [EasyNetQ.PersistentConnection] [ThreadId 12] Failed to connect to any Broker. Retrying in 00:00:05

它就在这里,永远等待!当消息在 RabbitMQ 服务器上累积时,看起来事情在某种程度上死锁:

enter image description here

当我停止我的消费者应用程序并重新启动它时,消息会被拾取。

我使用以下代码连接两个应用程序(发送方和消费者):

private static IBus SetupRabbitMqConnection(string rabbitServer, string rabbitVHost, ushort rabbitPort, string rabbitUser, string rabbitPwd)
{
    Log.Logger.Debug($"Creating a connection to RabbitMQ server '{rabbitServer}' on port {rabbitPort.ToString()} " +
                     $"using the EasyNetQ library....");

    try
    {
        var connStr = $"host={rabbitServer}:{rabbitPort.ToString()};virtualHost={rabbitVHost};username={rabbitUser};" +
            $"password={rabbitPwd};publisherConfirms=true;timeout=30;prefetchcount=1;requestedHeartbeat=30";
        var msgBus = RabbitHutch.CreateBus(connStr, x => { });
        if (!msgBus.IsConnected)
        {
            var errMsg = $"Currently not connected to RabbitMQ server '{rabbitServer}'.";
            Log.Logger.Error(errMsg);
        }
        Log.Logger.Debug("Successfully connected to RabbitMQ server.");
        return msgBus;
    }
    catch (Exception ex)
    {
        Log.Logger.Error($"Error to establish a connection to RabbitMQ server '{rabbitServer}'. Error: {ex}");
        throw;
    }
}

消费者程序注册监听器如下:

var msgBus = SetupRabbitMqConnection(rabbitServer, vhost, rabbitPort, rabbitUser, rabbitPwd);
RegisterMsgSubscriptions(msgBus);

private static void RegisterMsgSubscriptions(IBus msgBus)
{
    Log.Logger.Debug("Starting to register RabbitMQ message subscriptions...");
    try
    {
        #region Queue declarations

        var advancedBus = msgBus.Advanced;
        var testQueueOne = new EasyNetQ.Topology.Queue(TestQueueOneName, true);

        Log.Logger.Debug("Finished declaring queues.");

        #endregion

        #region Message Queue Handler registrations

        advancedBus.Consume(testQueueOne, registration => registration
            .Add<RabbitMessage<TestTextMessageDto>>(MessageProcessor.ProcessRabbitTestMessage));

        Log.Logger.Debug("Finished registration of message handlers for several queues.");

        #endregion
    }
    catch (Exception ex)
    {
        Log.Logger.Error($"Error registering message handler. Error: {ex}");
    }
}

知道这里可能出了什么问题吗?在生产环境中,我们有超过 100 台服务器在消费消息。这些服务器位于全国各地。 RabbitMQ 服务器在数据中心。因此,如果连接丢失,消费服务器必须恢复,否则无法使用!

最佳答案

只需在连接事件上添加订阅逻辑(它将在每个代理连接事件上触发)。

advancedBus.Connected += (sender, args) => { 
   // subscribe logic
};

当然你应该处理好不要在任何连接等上调用订阅逻辑两次

关于rabbitmq - RabbitMQ 服务器无法访问后,EasyNetQ 不会重新连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54316179/

相关文章:

c# - 具有 EasynetQ 消息生命周期的简单注入(inject)器 Ioc DBContext

EasyNetQ - 如何在 EasyNetQ_Default_Error_Queue 消息中包含原始队列名称?

asp.net-core - 如何为 ASP.NET Core 服务提供者编写 EasyNetQ 自动订阅者调度程序?

c# - EasyNetQ - 从现有队列接收

rabbitmq - rabbitmq配置文件在哪里?

concurrency - Celery 仅使用 20% 的 CPU(峰值)

c# - RabbitMQ 基本恢复不起作用

jms - RabbitMQ 与 Mule 相比如何

python - Celery 第一步 - result.get() 超时错误

c# - 如何使用 EasyNetQ/RabbitMQ 进行错误处理