c# - RabbitMQ 和序列化奇怪的错误

标签 c# serialization rabbitmq amqp binary-deserialization

我有两个应用程序,app1.cs 和 app2.cs(下面的代码)。此外,我还有一个从 refer.cs(下面的代码)中提取的 dll。当我编译 app1.cs(发送测量对象)时,出现以下异常:

Unhandled Exception: RabbitMQ.Client.Exceptions.OperationInterruptioedException

我看不到连接是如何中断的。您看到问题出在哪里了吗?

问候, 黛米

//refer.cs from which refer.dll is created

using System;
using System.IO;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;

namespace refer
{
    //start alternate serialization
    public static class AltSerialization
    {
        public static byte[] AltSerialize(Measurement m)
        {
         using (var ms = new MemoryStream())
            {
                var bf = new BinaryFormatter();
                bf.AssemblyFormat = System.Runtime.Serialization.Formatters.FormatterAssemblyStyle.Simple;
                bf.Serialize(ms, m);
                return ms.GetBuffer();
            }
        }

        public static Measurement AltDeSerialize(byte[] seriM)   
        {
        using (var stream = new MemoryStream( seriM ))
            {
                BinaryFormatter bf = new BinaryFormatter();
                bf.AssemblyFormat = System.Runtime.Serialization.Formatters.FormatterAssemblyStyle.Simple;
                return (Measurement)bf.Deserialize(stream);           
            }
        }
    }
    //end alternte serialization

    [Serializable] //This attribute sets class to be serialized
    public class Measurement : ISerializable
    {             
        [NonSerialized] public int id;
        public int time; //timestamp
        public double value;

        public Measurement()
        {
            id = 1;
            time = 12;
            value = 0.01;
        }

        public Measurement(int _id, int _time, double _value)
        {
            id = _id;
            time = _time;
            value = _value;
        }

        //Deserialization constructor   
        public Measurement(SerializationInfo info, StreamingContext ctxt)
        {
            //Assign the values from info to the approporiate properties   
            Console.WriteLine("DeSerialization construtor called.");
            time = (int)info.GetValue("MeasurementTime", typeof(int));
            value = (double)info.GetValue("MeasurementValue", typeof(double));
        }

       //Serialization function   
        public void GetObjectData(SerializationInfo info, StreamingContext ctxt)
        {
            // Custom name-value pair
            // Values must be read with the same name they're written       
            info.AddValue("MeasurementTime", time);
            info.AddValue("MeasurementValue", value);
        }
    }
}

//MB1.cs

using System;
using System.IO;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using UtilityMeasurement;

public interface IMessageBus
{    
string MsgSys       // Property 1
{
    get;
    set;
}

void write (Measurement m1);
Measurement read();
void publish(string queue);   
void subscribe(string queue);   
}

public class Rabbit : IMessageBus
{   
// Implementation of methods for Rabbit class go here
private List<string> publishQ = new List<string>();
private List<string> subscribeQ = new List<string>();


public void write ( Measurement m1 )
{
    byte[] body = Measurement.AltSerialize( m1 );

    IConnection connection = factory.CreateConnection();
    IModel channel = connection.CreateModel();

    foreach (string queue in publishQ) 
    {
        channel.BasicPublish("", queue, null, body);
        Console.WriteLine("\n  [x] Sent to queue {0}.", queue);
    }
}

public void publish(string queueName)
{       
    channel.QueueDeclare(queueName, true, false, false, null); //durable=true
    publishQ.Add(queueName); //and, add it the list of queue names to publish to
}

public Measurement read() 
{
    QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
    foreach (string queue in subscribeQ) 
    {
        channel.BasicConsume(queue, true, consumer);
    }   
    System.Console.WriteLine(" [*] Waiting for messages." +
                            "To exit press CTRL+C");
    BasicDeliverEventArgs ea = 
        (BasicDeliverEventArgs)consumer.Queue.Dequeue();
    return Measurement.AltDeSerialize(ea.Body);
}

public void subscribe(string queueName)
{
    channel.QueueDeclare(queueName, true, false, false, null);
    subscribeQ.Add(queueName);
}

public static string MsgSysName;
public string MsgSys
{
    get 
    { 
        return MsgSysName;
    }
    set
    {
        MsgSysName = value;
    }
}

public Rabbit(string _msgSys) //Constructor
{
    ConnectionFactory factory = new ConnectionFactory();
    factory.HostName = "localhost"; 

    System.Console.WriteLine("\nMsgSys: RabbitMQ");
    MsgSys = _msgSys;
}
}

public class Zmq : IMessageBus
{
public void write ( Measurement m1 )
{
    //
}
public Measurement read() 
{
    //
    return null;
}
public void publish(string queue)
{
//
}
public void subscribe(string queue)
{
//      
}   

public static string MsgSysName;
public string MsgSys
{
    get 
    { 
        return MsgSysName;
    }
    set
    {
        MsgSysName = value;
    }
}

// Implementation of methods for Zmq class go here
public Zmq(string _msgSys) //Constructor
{
    System.Console.WriteLine("ZMQ");
    MsgSys = _msgSys;
}
} 

public class MessageBusFactory
{
public static IMessageBus GetMessageBus(string MsgSysName)
{
    switch ( MsgSysName )
    {
        case "Zmq":
            return new Zmq(MsgSysName);
        case "Rabbit":
            return new Rabbit(MsgSysName);
        default:
            throw new ArgumentException("Messaging type " +
                MsgSysName + " not supported." );
    }
}
}

public class MainClass
{
    public static void Main()
    {
    //Asks for the message system
    System.Console.WriteLine("\nEnter name of messageing system: ");
    System.Console.WriteLine("Usage: [Rabbit] [Zmq]");
    string MsgSysName = (System.Console.ReadLine()).ToString();

    //Create a new Measurement message
    Measurement m1 = new Measurement(2, 2345, 23.456);

    //Declare an IMessageBus instance:
    //Here, an object of the corresponding Message System
        // (ex. Rabbit, Zmq, etc) is instantiated
    IMessageBus obj1 = MessageBusFactory.GetMessageBus(MsgSysName);

    System.Console.WriteLine("\nA {0} object is now created.", MsgSysName);

    System.Console.WriteLine("With Test message:\n    ID: {0}", m1.id);
    System.Console.WriteLine("    Time: {0}", m1.time);
    System.Console.WriteLine("    Value: {0}", m1.value);

    // Ask queue name and store it
    System.Console.WriteLine("Enter a queue name to publish the message to: ");
    string QueueName = (System.Console.ReadLine()).ToString();
    obj1.publish( QueueName );

    System.Console.WriteLine("Enter another queue name: ");
    QueueName = (System.Console.ReadLine()).ToString();
    obj1.publish( QueueName );

    // Write message to the queue
    obj1.write( m1 ); 

}
}

//MB2.cs

using System; 
using System.IO;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using UtilityMeasurement;

public interface IMessageBus
{    
string MsgSys       // Property 1
{
    get;
    set;
}

void write (Measurement m1);
Measurement read();
void publish(string queue);   
void subscribe(string queue);   
}

public class Rabbit : IMessageBus
{   
// Implementation of methods for Rabbit class go here
private List<string> publishQ = new List<string>();
private List<string> subscribeQ = new List<string>();


public void write ( Measurement m1 )
{
    byte[] body = Measurement.AltSerialize( m1 );

    IConnection connection = factory.CreateConnection();
    IModel channel = connection.CreateModel();

    foreach (string queue in publishQ) 
    {
        channel.BasicPublish("", queue, null, body);
        Console.WriteLine("\n  [x] Sent to queue {0}.", queue);
    }
}

public void publish(string queueName)
{       
    channel.QueueDeclare(queueName, true, false, false, null); //durable=true
    publishQ.Add(queueName); //and, add it the list of queue names to publish to
}

public Measurement read() 
{
    QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
    foreach (string queue in subscribeQ) 
    {
        channel.BasicConsume(queue, true, consumer);
    }   
    System.Console.WriteLine(" [*] Waiting for messages." +
                            "To exit press CTRL+C");
    BasicDeliverEventArgs ea = 
        (BasicDeliverEventArgs)consumer.Queue.Dequeue();
    return Measurement.AltDeSerialize(ea.Body);
}

public void subscribe(string queueName)
{
    channel.QueueDeclare(queueName, true, false, false, null);
    subscribeQ.Add(queueName);
}

public static string MsgSysName;
public string MsgSys
{
    get 
    { 
        return MsgSysName;
    }
    set
    {
        MsgSysName = value;
    }
}

public Rabbit(string _msgSys) //Constructor
{
    ConnectionFactory factory = new ConnectionFactory();
    factory.HostName = "localhost"; 

    System.Console.WriteLine("\nMsgSys: RabbitMQ");
    MsgSys = _msgSys;
}
}


public class Zmq : IMessageBus
{
public void write ( Measurement m1 )
{
    //
}
public Measurement read() 
{
    //
    return null;
}
public void publish(string queue)
{
//
}
public void subscribe(string queue)
{
//      
}   

public static string MsgSysName;
public string MsgSys
{
    get 
    { 
        return MsgSysName;
    }
    set
    {
        MsgSysName = value;
    }
}

// Implementation of methods for Zmq class go here
public Zmq(string _msgSys) //Constructor
{
    System.Console.WriteLine("ZMQ");
    MsgSys = _msgSys;
}
} 

public class MessageBusFactory
{
public static IMessageBus GetMessageBus(string MsgSysName)
{
    switch ( MsgSysName )
    {
        case "Zmq":
            return new Zmq(MsgSysName);
        case "Rabbit":
            return new Rabbit(MsgSysName);
        default:
            throw new ArgumentException("Messaging type " +
                MsgSysName + " not supported." );
    }
}
}

public class MainClass
{
    public static void Main()
    {
    //Asks for the message system
    System.Console.WriteLine("\nEnter name of messageing system: ");
    System.Console.WriteLine("Usage: [Rabbit] [Zmq]");
    string MsgSysName = (System.Console.ReadLine()).ToString();

    //Declare an IMessageBus instance:
    //Here, an object of the corresponding Message System
        // (ex. Rabbit, Zmq, etc) is instantiated
    IMessageBus obj1 = MessageBusFactory.GetMessageBus(MsgSysName);

    System.Console.WriteLine("\nA {0} object is now created.", MsgSysName);

    System.Console.WriteLine("Enter a queue to subscribe to: ");
    string QueueName = (System.Console.ReadLine()).ToString();
    obj1.subscribe( QueueName );

    //Create a new Measurement object m2
    Measurement m2 = new Measurement(); 

    //Read message into m2
    m2 = obj1.read();
    m2.id = 11;
    System.Console.WriteLine("\nMessage received from queue {0}:\n    ID: {1}",QueueName, m2.id);
    System.Console.WriteLine("    Time: {0}", m2.time);
    System.Console.WriteLine("    Value: {0}", m2.value);
}
}

最佳答案

我刚刚在同一个项目中使用 Refer.cs 和 App1.cs 创建了一个 Vanilla C# VS2010 控制台应用程序项目。

我做了以下更改:

  • 添加了 RabbitMQ.Client.dll
  • 删除了 AssemblyVersion 属性
  • 在 App1.cs 的 Main 方法中添加了 string[] args

另外,我改变了:

factory.HostName = "localhost";

对此:

factory.HostName = "192.168.56.101";

这是我运行 rabbitmq-server 的 VirtualBox Ubuntu VM 的 IP 地址。没有抛出异常,服务器成功接收到消息。

所有迹象都指向服务器配置与给定的内容。我的猜测是您的 rabbitmq-server 根本没有运行,它没有在本地主机上运行,​​或者端口 5672 存在某种连接问题。

关于c# - RabbitMQ 和序列化奇怪的错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6658997/

相关文章:

java - protobuf3 将 java 对象序列化为 json ,该对象有一个 int32 字段,值为 0

java - @ResponseBody 使用 ArrayLists 序列化错误

rabbitmq - 在 flask-socketio 处理程序中调用 rabbitmq channel.start_consuming() 后无法发出

php - 实现事件提要、消息队列或 RDBMS 或 NoSQL 数据库的最佳(可扩展、快速、可靠)方法是什么?

c# - Mvc Donut Caching 以编程方式禁用缓存

c# - wpf中静态数据放在哪里?

java - 在没有在机器上安装 C++ 的情况下在 java 中使用 protobuf

linux - 使用 Ctrl+C 退出后重新连接到 linux 服务输出

c# - window.external 是否同步

C# MySQL无法连接到ubuntu服务器