session - 服务总线 - 按序列号从 session 中检索消息

标签 session servicebus azure-pack

我目前正在尝试从 中检索特定消息 session .

为此,我想使用 . 接收 (Int64) 关于 消息 session 我经过的地方序列号的消息。

这是我的代码 -

long msgSequenceNr = 1337;

QueueClient queueClient = QueueClient.CreateFromConnectionString(Constants.ServiceBusConnectionString, Constants.TestQueueEntityName, ReceiveMode.PeekLock);
MessageSession msgSession = queueClient.AcceptMessageSession(Constants.TestSessionId);

var peekedMsg = msgSession.Peek(msgSequenceNr); // <-- Works fine!
var receivedMsg = msgSession.Receive(msgSequenceNr); // <-- MessageNotFoundException

不幸的是,接收将导致 MessageNotFoundException 而 Peek 工作正常。
这是我错过的限制还是有另一种方法来实现这一点。

注意 session 中可能有多条消息

最佳答案

使用 SequenceNumber 接收只能与 Defer 方法结合使用。这是你将如何实现它:

  • 消息已收到,但现在无法处理(可能正在等待其他进程完成)。
  • 在某些持久存储(表存储、SQL 数据库等)中保留 SequenceNumber
  • 当您知道处理可以继续时(例如:依赖进程已完成),从您的持久存储中加载所有 SequenceNumbers。
  • 使用 Receive(int sequenceNumber) 或 ReceiveBatch(int[] sequenceNumbers) 来接收和处理您的延迟消息。

  • 示例应用:https://code.msdn.microsoft.com/windowsazure/Brokered-Messaging-ccc4f879#content

    更新:

    形成您的评论,我注意到“取消延迟”延迟消息可能是一种解决方案。这是一些示例代码,用于取消延迟将延迟消息复制到新消息的消息,完成延迟消息并将新消息发送回队列。这使用 TransactionScope 以事务方式完成并重新发送消息,以避免丢失消息的风险:
        var messageId = "12434539828282";
    
        // Send.
        var msg = new BrokeredMessage {SessionId = "user1", MessageId = messageId };
        msg.Properties.Add("Language", "Dutch");
        queue.Send(msg);
    
        // Receive.
        var session = queue.AcceptMessageSession();
        msg = session.Receive();
    
        // Store the sequence number.
        var sequenceNumber = msg.SequenceNumber;
    
        // Defer.
        msg.Defer();
    
        // Change to true to test if the transaction worked.
        var shouldThrow = false;
    
        // Later processing of deferred message.
        msg = session.Receive(sequenceNumber);
    
        try
        {
            using (var ts = new TransactionScope())
            {
                // Create a new message.
                var undeferredMessage = new BrokeredMessage {SessionId = msg.SessionId, MessageId = msg.MessageId};
                foreach (var prop in msg.Properties)
                    undeferredMessage.Properties.Add(prop);
    
                // Complete and send within the same transaction.
                msg.Complete();
                if (shouldThrow)
                    throw new InvalidOperationException("Some error");
                queue.Send(undeferredMessage);
    
                // Complete the transaction.
                ts.Complete();
            }
        }
        catch (Exception ex)
        {
            msg.Abandon();
        }
    
        if (shouldThrow)
        {
            msg = session.Receive(sequenceNumber);
            Console.WriteLine(msg.MessageId + " should match: " + messageId);
        }
        else
        {
            try
            {
                msg = session.Receive(sequenceNumber);
            }
            catch (Exception ex)
            {
                Console.WriteLine("Message not found, transaction worked OK.");
            }
        }
    

    注意:这里我只是简单地复制了属性。请记住,您可能想要复制正文和任何其他附加信息。

    关于session - 服务总线 - 按序列号从 session 中检索消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26525660/

    相关文章:

    azure - 在 Azure ServiceBus 中使用大量实体是否有缺点

    azure - 本地服务总线 - 放弃延迟消息

    javascript - session 不工作 + 快速

    asp.net-mvc - .NET 自定义成员资格与。自定义登录/注册 : Authentication/Authorization

    c# - Windows 服务总线 - 多个接收器

    直接存储空间

    angularjs - 2个应用程序之间的 session 处理

    php - 在 Blade - Laravel 中使用 session 数据

    java - 如何使用Java创建Azure服务总线队列?