node.js - 如何使Azure Functions按顺序连续消费/处理Azure Service Bus Queue消息?

标签 node.js azure azure-functions message-queue azureservicebus

(免责声明:我对消息队列和 Azure Functions 都不熟悉)

我有一个 Azure 服务总线队列。它使用了 FIFO 技术,这正是我所需要的。我还成功设置了一项不断将消息推送到队列中的作业。我现在队列中有大约 100K 条消息,它们已按正确的顺序排列

现在,我需要按顺序使用/处理这些消息。每条 Message 的处理时间大约为 1 秒。但当涉及到 self 处理时,我只知道使用Azure Functions中的“计时器函数”的方法。但同样,当我使用“计时器功能”时,我需要每 5 分钟运行一次,然后每次需要拉取大约 300 条消息。我的代码如下所示(我使用的是 NodeJS):

函数.json

{
  "bindings": [
    {
      "name": "myTimer",
      "type": "timerTrigger",
      "direction": "in",
      "schedule": "0 */5 * * * *"
    }
  ]
}

index.js

..
..
let allMessages = [];   
while (allMessages.length < 300) {
    const messages = await sbReceiver.receiveMessages(300, {
        maxWaitTimeInMs: 60 * 1000,
    });
    
    if (!messages.length) {
        break;
    }
    
    allMessages.push(...messages);
    
    for (let message of messages) {
        
        await axios({
            /**
             * PROCESS SINGLE MESSAGE HERE
             * TAKES ABOUT 1 SEC EACH
             */
        })
        .then(async function (response) {
            await sbReceiver.completeMessage(message);
        })
        .catch(function (error) {
            break;
        });
        
    }
}
..
..

这种方法可以很好地完成工作。但我只是不满意必须通过预定作业(计时器函数)进行处理的想法。

问题

除了使用“计时器函数”之外,是否有一种方法(或)如何使 Azure 函数(NodeJS)始终保持自行消费/处理排队消息,同时确保消息是按顺序处理的吗?

这里的要点是:

  1. 继续消费/处理消息,最好不使用计时器
  2. 按顺序使用/处理消息

编辑: 我知道“Azure 服务总线队列触发器”,但我不认为这些触发器会等待上一个作业“完成处理”。因为我的处理涉及到另一方的API调用,只需按顺序依次完成即可。我认为队列触发器无论如何都会在新消息到达后立即被触发,这会扰乱另一端的排序。如果我在这个概念上有误,请纠正我。

预先感谢您的好意建议。

最佳答案

发布我的评论作为社区的答案。

队列到达服务总线后,您可以直接使用 Azure 服务总线队列触发器来处理和延迟消息。您可以在 Node.js 代码中使用延迟方法 deferMessage() 以及服务总线队列触发器。一旦新消息到达队列。 Azure服务总线队列触发器将触发该函数,然后您可以添加延迟消息代码块来延迟消息并处理它。

您可以引用这个Github repository中的defer.js代码示例代码

示例服务总线 js 函数:-

index.js:-


module.exports = async function(context, mySbMsg) {
    context.log('JavaScript ServiceBus queue trigger function processed message', mySbMsg);
};

const { ServiceBusClient, delay } = require("@azure/service-bus");

// Load the .env file if it exists
require("dotenv").config();

// Define connection string and related Service Bus entity names here
const connectionString = "Endpoint=sb://siliconservicebus76.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xcMOcbMJAPnPM+LTl+zRq4Ix2EjHQFkrs+ASbBXJ2v4=";
const queueName = "test";

async function main() {
  await sendMessages();
  await receiveMessage();
}

// Shuffle and send messages
async function sendMessages() {
  const sbClient = new ServiceBusClient(connectionString);
  // createSender() can also be used to create a sender for a topic.
  const sender = sbClient.createSender(queueName);

  const data = [
    { step: 1, title: "Shop" },
    { step: 2, title: "Unpack" },
    { step: 3, title: "Prepare" },
    { step: 4, title: "Cook" },
    { step: 5, title: "Eat" },
  ];
  const promises = new Array();
  for (let index = 0; index < data.length; index++) {
    const message = {
      body: data[index],
      subject: "RecipeStep",
      contentType: "application/json",
    };
    // the way we shuffle the message order is to introduce a tiny random delay before each of the messages is sent
    promises.push(
      delay(Math.random() * 30).then(async () => {
        try {
          await sender.sendMessages(message);
          console.log("Sent message step:", data[index].step);
        } catch (err) {
          console.log("Error while sending message", err);
        }
      })
    );
  }
  // wait until all the send tasks are complete
  await Promise.all(promises);
  await sender.close();
  await sbClient.close();
}

async function receiveMessage() {
  const sbClient = new ServiceBusClient(connectionString);

  // If receiving from a subscription, you can use the createReceiver(topicName, subscriptionName) overload
  let receiver = sbClient.createReceiver(queueName);

  const deferredSteps = new Map();
  let lastProcessedRecipeStep = 0;
  try {
    const processMessage = async (brokeredMessage) => {
      if (
        brokeredMessage.subject === "RecipeStep" &&
        brokeredMessage.contentType === "application/json"
      ) {
        const message = brokeredMessage.body;
        // now let's check whether the step we received is the step we expect at this stage of the workflow
        if (message.step === lastProcessedRecipeStep + 1) {
          console.log("Process received message:", message);
          lastProcessedRecipeStep++;
          await receiver.completeMessage(brokeredMessage);
        } else {
          // if this is not the step we expected, we defer the message, meaning that we leave it in the queue but take it out of
          // the delivery order. We put it aside. To retrieve it later, we remeber its sequence number
          const sequenceNumber = brokeredMessage.sequenceNumber;
          deferredSteps.set(message.step, sequenceNumber);
          console.log("Defer received message:", message);
          await receiver.deferMessage(brokeredMessage);
        }
      } else {
        // we dead-letter the message if we don't know what to do with it.
        console.log(
          "Unknown message received, moving it to dead-letter queue ",
          brokeredMessage.body
        );
        await receiver.deadLetterMessage(brokeredMessage);
      }
    };
    const processError = async (args) => {
      console.log(`>>>>> Error from error source ${args.errorSource} occurred: `, args.error);
    };

    receiver.subscribe(
      { processMessage, processError },
      {
        autoCompleteMessages: false,
      }
    ); // Disabling autoCompleteMessages so we can control when message can be completed, deferred or deadlettered
    await delay(10000);
    await receiver.close();
    console.log("Total number of deferred messages:", deferredSteps.size);

    receiver = sbClient.createReceiver(queueName);
    // Now we process the deferred messages
    while (deferredSteps.size > 0) {
      const step = lastProcessedRecipeStep + 1;
      const sequenceNumber = deferredSteps.get(step);
      const [message] = await receiver.receiveDeferredMessages(sequenceNumber);
      if (message) {
        console.log("Process deferred message:", message.body);
        await receiver.completeMessage(message);
      } else {
        console.log("No message found for step number ", step);
      }
      deferredSteps.delete(step);
      lastProcessedRecipeStep++;
    }
    await receiver.close();
  } finally {
    await sbClient.close();
  }
}

main().catch((err) => {
  console.log("Deferral Sample - Error occurred: ", err);
  process.exit(1);
});

module.exports = { main };

输出:-

enter image description here

关于node.js - 如何使Azure Functions按顺序连续消费/处理Azure Service Bus Queue消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76626519/

相关文章:

javascript - 使用 JavaScript 和 Jade 添加到 localStorage

node.js - 如何插入 [-XXX.XXXXXXX, XX.XXXXXX] 格式的定位坐标?

node.js - `npm link --save` 不更新对我的 package.json 的依赖

node.js - 从 Heroku 的子目录运行 npm

Azure AD B2C 忽略自定义 HTML 页面内容

c# - (C#)Azure 函数 - 具有存储帐户绑定(bind)时禁用 Nagle 算法

node.js - 如何在Azure Functions node.js中获取客户端IP地址?

python - 在 Azure Devops 中创建 pull 请求(使用 REST API)不会链接工作项

针对 VM 的 Azure 负载均衡器入站 NAT 规则

azure - 在没有flask的情况下在azure中运行python函数