queue - 通过 REST 向服务总线队列发送消息并通过 TCP 接收消息?

标签 queue servicebus azureservicebus

我有一个 Azure 服务总线队列。我想使用 REST API 将消息发布到队列,但使用 IIS 托管的 WCF 服务(使用 netMessagingBinding)来接收消息。

有人有演示这一点的资源链接吗?或者是否有人能够提供如何使用 REST POST 将消息推送到队列然后使用 netMessagingBinding 接收的代码示例?

读完这篇文章后,我相信这是可能的:

You can send and receive messages to or from the service using REST or .NET managed API, mixing and matching clients using different protocols in a given scenario. For example, you can send a message to a queue using one protocol and consume it using a different protocol.

http://msdn.microsoft.com/en-us/library/windowsazure/hh780717.aspx

我能够使用 netMessagingBinding 将消息推送到队列并使用 netMessagingBinding 接收消息。我还可以使用 REST POST 将消息推送到队列,然后使用 REST DELETE 从队列中接收和删除消息。我只是无法 REST POST 消息并使用 netMessagingBinding 接收

最佳答案

NetMessagingBinding 始终使用 BinaryMessageEncodingBindingElement+NetMessagingTransportBindingElement 构建 channel 堆栈。如果 ServiceBus 队列/订阅中的 BrokeredMessage 是普通的旧 [text] xml,则 BinaryMessageEncoding 将不起作用,使用 WCF 时,您必须使用带有 TextMessageEncoder 和 NetMessagingTransportBindingElement 的 CustomBinding。

简而言之,您需要使用带有 TextMessageEncodingBindingElement(MessageVersion = None)的 CustomBinding 和 NetMessagingTransportBindingElement,确保 Action=”*”,并在 ServiceBehavior 上设置 AddressFilterMode=Any。

以下是使用 NetMessagingTransportBindingElement 读取普通旧式 XML 消息的两种方法:

解决方案#1 在ServiceContract中使用System.ServiceModel.Channels.Message并调用Message.GetBody()

namespace MessagingConsole
{
    static class Constants {
        public const string ContractNamespace = "http://contoso";
    }

    [DataContract(Namespace = Constants.ContractNamespace)]
    class Record
    {
        [DataMember]
        public string Id { get; set; }
    }

    [ServiceContract]
    interface ITestContract
    {
        [OperationContract(IsOneWay = true, Action="*")]
        void UpdateRecord(Message message);
    }

    [ServiceBehavior(
        AddressFilterMode = AddressFilterMode.Any)] // This is another way to avoid “The message with To ” cannot be processed at the receiver…”
    class TestService : ITestContract
    {
        [OperationBehavior]
        public void UpdateRecord(Message message)
        {
            Record r = message.GetBody<Record>();
            Console.WriteLine("UpdateRecord called! " + r.Id);
        }
    }

    class ServiceProgram
    {
        static void Main(string[] args)
        {
            string solution = "sb://SOMENS";
            string owner = "owner";
            string key = "XXXXXX=";
            string topicPath = "Topic2";
            string subscriptionName = "Sub0";
            TokenProvider tokenProvider = TokenProvider.CreateSharedSecretTokenProvider(owner, key);

            MessagingFactory factory = MessagingFactory.Create(solution, tokenProvider);
            TopicClient sender = factory.CreateTopicClient(topicPath);
            SubscriptionClient receiver = factory.CreateSubscriptionClient(topicPath, subscriptionName, ReceiveMode.ReceiveAndDelete);

            string interopPayload = "<Record xmlns='" + Constants.ContractNamespace + "'><Id>4</Id></Record>";
            BrokeredMessage interopMessage = new BrokeredMessage(new MemoryStream(Encoding.UTF8.GetBytes(interopPayload)), true);
            sender.Send(interopMessage);

            CustomBinding binding = new CustomBinding(
                new TextMessageEncodingBindingElement { MessageVersion = MessageVersion.None },
                new NetMessagingTransportBindingElement());
            ServiceHost serviceHost = new ServiceHost(typeof(TestService), new Uri(solution));
            ServiceEndpoint endpoint = serviceHost.AddServiceEndpoint(typeof(ITestContract), binding, topicPath + "/Subscriptions/" + subscriptionName);
            endpoint.Behaviors.Add(new TransportClientEndpointBehavior(tokenProvider));
            serviceHost.Open();
            Console.WriteLine("Service is running");
            Console.ReadLine();            
        }
    }
}

解决方案 #2 定义 MessageContract 数据类型以使预期的 Soap 契约与互操作客户端发送的内容相匹配:

namespace MessagingConsole
{
    static class Constants
    {
        public const string ContractNamespace = "http://contoso";
    }

    [DataContract(Namespace = Constants.ContractNamespace)]
    class Record
    {
        [DataMember]
        public string Id { get; set; }
    }

    [MessageContract(IsWrapped=false)]
    class RecordMessageContract
    {
        [MessageBodyMember(Namespace = Constants.ContractNamespace)]
        public Record Record { get; set; }
    }

    [ServiceContract]
    interface ITestContract
    {
        [OperationContract(IsOneWay = true, Action="*")]
        void UpdateRecord(RecordMessageContract recordMessageContract);
    }

    class ServiceProgram
    {
        static void Main(string[] args)
        {
            string solution = "sb://SOMENS";
            string owner = "owner";
            string key = "XXXXXXXXXXXXXX=";
            string topicPath = "Topic2";
            string subscriptionName = "Sub0";
            TokenProvider tokenProvider = TokenProvider.CreateSharedSecretTokenProvider(owner, key);

            MessagingFactory factory = MessagingFactory.Create(solution, tokenProvider);
            TopicClient sender = factory.CreateTopicClient(topicPath);
            SubscriptionClient receiver = factory.CreateSubscriptionClient(topicPath, subscriptionName, ReceiveMode.ReceiveAndDelete);

            string interopPayload = "<Record xmlns='" + Constants.ContractNamespace + "'><Id>5</Id></Record>";
            BrokeredMessage interopMessage = new BrokeredMessage(new MemoryStream(Encoding.UTF8.GetBytes(interopPayload)), true);
            sender.Send(interopMessage);

            CustomBinding binding = new CustomBinding(
                new TextMessageEncodingBindingElement { MessageVersion = MessageVersion.None },
                new NetMessagingTransportBindingElement());
            ServiceHost serviceHost = new ServiceHost(typeof(TestService), new Uri(solution));
            ServiceEndpoint endpoint = serviceHost.AddServiceEndpoint(typeof(ITestContract), binding, topicPath + "/Subscriptions/" + subscriptionName);
            endpoint.Behaviors.Add(new TransportClientEndpointBehavior(tokenProvider));
            serviceHost.Open();
            Console.WriteLine("Service is running");
            Console.ReadLine();
        }
    }

    [ServiceBehavior(
        AddressFilterMode = AddressFilterMode.Any
        )]
    class TestService : ITestContract
    {
        [OperationBehavior]
        public void UpdateRecord(RecordMessageContract recordMessageContract)
        {
            Record r = recordMessageContract.Record;
            Console.WriteLine("UpdateRecord called! " + r.Id);
        }
    }
}

关于queue - 通过 REST 向服务总线队列发送消息并通过 TCP 接收消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15376297/

相关文章:

ruby-on-rails - 在 Rails 中组织工作进程的最佳方式是什么?

Python多处理,代码继续执行?

c - 队列弹出一些垃圾值

php - 配置变量之间的冲突

azure - 使用 Azure 上的 AppFabric 服务总线实现可靠的角色间通信,观察者模式

Python - 从Azure服务总线主题接收非常慢

asp.net - 我什么时候应该在 ASP.NET 网站中使用服务总线?

azure - 如何使用 Azure 服务总线队列作为服务器/客户端场景

c# - 是否可以在尝试接收消息之前确定消息是否被锁定

azure - 如何确保通过 Azure 服务总线队列接收所有消息?