.net - RabbitMQ Pub/Sub : Closing the last consumer closes the publisher's channel/model. 为什么?

标签 .net c#-4.0 rabbitmq publish-subscribe

我正在使用 .Net RabbitMQ 来处理一些 pub/sub(发布者/订阅者)代码。一切正常,直到我开始关闭消费者。消费者正确处理发布的数据,直到我关闭最后一个消费者。在所有消费者之后,我打开了一个新的消费者,但没有任何反应。应用程序打开,但没有收到来自发布者的任何数据。

我检查了发布者代码,发现当最后一个消费者关闭时,其 channel 的 IsOpen 属性变为 false。我不知道是否有一些设置可以在消费者关闭后保持 channel 打开。

这是我的发布商代码: 编辑我最初粘贴了错误的代码。

这是我的消费者代码:

public MyConsumer
{
private readonly ConnectionFactory _factory;
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly Timer _timer;

private SubscriptionConsumerType(string ipAddress, string exchangeName, TimeSpan tsPullCycle)
{
    //set up connection
    this._factory = new ConnectionFactory();
    this._factory.HostName = ipAddress;
    this._connection = this._factory.CreateConnection();
    this._channel = this._connection.CreateModel();

    //set up and bind the exchange
    this._channel.ExchangeDeclare(exchangeName, "fanout", false, true, new Dictionary<string, object>());
    string queueName = this._channel.QueueDeclare().QueueName;
    this._channel.QueueBind(queueName, exchangeName, "");

    //start consuming
    QueueingBasicConsumer consumer = new QueueingBasicConsumer(this._channel);
    this._channel.BasicConsume(queueName, true, consumer);

    //periodically check for new messages from the publisher
    this._timer = new Timer(new TimerCallback(this.TimerStep), consumer, tsPullCycle, tsPullCycle);
}

public void Dispose()
{
    if (this._timer != null)
        this._timer.Dispose();

    if (this._channel != null)
    {
        this._channel.Close();
        this._channel.Dispose();
    }

    if (this._connection != null)
    {
        this._connection.Close();
        this._connection.Dispose();
    }
}
}

现在,我的解决方法是始终在某处打开消费者窗口。但理想情况下,我希望我的发布程序能够运行,无论打开的消费者窗口数量如何。谢谢。

编辑糟糕,我粘贴了错误的生产者代码。这是:

private SubscriptionBroadcastType(string ipAddress, string exchangeName)
{
    this._factory = new ConnectionFactory();
    this._factory.HostName = ipAddress;
    this._connection = this._factory.CreateConnection();
    this._channel = this._connection.CreateModel();

    this._exchangeName = exchangeName;
    this._channel.ExchangeDeclare(exchangeName, SubscriptionBroadcastType.BROADCAST, SubscriptionBroadcastType.DURABLE, SubscriptionBroadcastType.AUTO_DELETE, new Dictionary<string, object>());
}

public void BroadcastMessage(string message)
{
    lock (this._syncroot) //protect _channel
    {
        if (this._channel.IsOpen)
            this._channel.BasicPublish(this._exchangeName, "", null, System.Text.Encoding.UTF8.GetBytes(message));
    }
}

最佳答案

我认为你可能有一些根本性的错误。请检查您是否发布了正确的代码。正如我所读到的,您有一个生产者创建一个特定的命名队列并直接发布到该队列。您有一个消费者创建一个特定的命名交换,然后创建一个动态命名的新队列并将其绑定(bind)到该交换。然后您将从该队列中读取数据,但它不能是您最初发布到的队列。

我会首先修复您的代码,以在您的发布者代码中添加创建一个交换,其中包含您可以在消费者代码中访问的特定名称。此行将出现在生产者线程中,而不是队列声明行中:

this._channel.ExchangeDeclare(exchangeName, "fanout", false, true, new Dictionary<string, object>());

然后您将需要发布到该交换器,因此不要将发布到队列的行更改为:

this._channel.BasicPublish(exchangeName, "", this._basicProperties, System.Text.Encoding.UTF8.GetBytes(message));

您的消费者应该设置得很好,可以按原样从队列中接收这些消息。看看这是否有助于解决您的问题。

关于.net - RabbitMQ Pub/Sub : Closing the last consumer closes the publisher's channel/model. 为什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10882093/

相关文章:

.net - 从 .NET 中的路径中删除点

python - aio_pika 的健壮连接未重新连接

c# - 使用 CaSTLe windsor 创建对象而不是工厂类

node.js - 通过远程数据库或rabimtMQ发送db数据

azure - RabbitMQ 使用哪种云?

c# - .NET 应用程序 - 如何隐藏依赖库的实现?

c# - 关于 <FIELDSET> <LABEL> 标签而不是使用 TABLE 的文档?

c# - 如何从字符串生成唯一整数?

wpf - 从合并序列中删除 IObservable

ios - PushSharp Push 在停止之前不会处理队列