我正在使用与 Azure 服务总线相关的 Rebus 创建概念验证,但是在解析从外部源放入队列的消息时遇到了一些问题。
我收到错误消息:
Received message with empty or absent 'rbs2-msg-id' header!
我浏览了 GitHub,注意到这个列表,说明有人在 RabbitMQ 上遇到了类似的问题,建议使用装饰器:
https://github.com/rebus-org/Rebus/issues/508
但是我不确定如何仅针对消息 ID 执行此操作。
我选择的一个选项实际上是修改 Rebus.AzureTransport 代码来执行此操作:
var messageId = headers.GetValueOrNull(Headers.MessageId);
if (string.IsNullOrEmpty(messageId))
{
messageId = Guid.NewGuid().ToString();
headers[Headers.MessageId] = messageId;
}
但更喜欢替代方案!
我注意到的另一件事是,BrokeredMessage 被放置在 ASB 上,如下所示:
var message = new BrokeredMessage("<xml>This is a test message: " + DateTime.Now+ "</xml>");
Rebus 接收时未正确序列化。我收到以下错误:
Unhandled exception 1 while handling message with ID db13880d-124c-4ed5-993e-96faeca0f140: System.Collections.Generic.KeyNotFoundException: Could not find the key 'rbs2-content-type'
通过重写序列化器,底层消息将呈现为:
@strin3http://schemas.microsoft.com/2003/10/Serialization/?6This is a test message: 06/12/2016 07:44:21
所以我不确定我做错了什么。
提前致谢。
最佳答案
嗯...正如错误消息所示,传入消息不包含足够的信息以便 Rebus 处理它。
此外,Rebus 的 Azure 服务总线传输要求将消息内容作为 Stream
发送,以避免将包含的字节封装在 XML 结构中的成本(这是 Azure 服务的结构)公交车司机默认这样做)。
如果我是您,我可能不会使用 Rebus 从不包含 Rebus 发送的消息的队列中接收消息。或者,我可能会这样做 - 但前提是传入的消息非常容易适应 Rebus 格式。
尽管要实现这一点可能有点麻烦,因为 - 正如您所正确观察到的 - 它需要存在某些 header ,这些 header 可以提示 Rebus 如何在处理尝试之间跟踪它,如何反序列化它,如果 bus.Reply
被调用,在哪里回复等等。所以我仍然可能最终不这样做:)
我建议你做这样的事情:
while(true)
{
var message = GetNextMessageOrNullFromQueue();
if (message == null) continue;
UnwrapMessageAndSendWithRebusToRebusQueue(message);
}
通过这种方式编写您自己的接收循环来接收自定义格式的 Azure 服务总线消息,并将其实际处理(也可能从 XML 文本反序列化为对象?)委托(delegate)给 Rebus 端点。
这通常是与外部事物集成的首选方式,因为它保留了在应用程序外部和内部之间来回桥接的逻辑,而不是渗透到您的应用程序中。
关于azure - Rebus Azure ServiceBus - 源自外部服务的消息缺少 MessageID,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40990267/