Azure WebJobs SDK : Not calling message.complete() 在特定 ServiceBus 触发的函数上

标签 azure azure-webjobssdk azure-servicebus-topics

我有一个以下形式的服务总线触发函数:

public static void ProcessJobs([ServiceBusTrigger("topicname", "subscriptionname", AccessRights.Listen)] BrokeredMessage input, [ServiceBus("topic2name", "subscription2name", AccessRights.Send)] out BrokeredMessage output)
{
    output = new BrokeredMessage(input.GetBody<String>());
}

我的用例是,此函数只是从一个主题获取消息并将其推送到另一个主题。我不想在此过程中从源主题中删除消息。

这可以实现吗?

此外,我在哪里可以找到有关 AccessRights 及其如何影响消息访问的更多信息。例如:在上面的示例中,我使用 AccessRights.Listen 从输入主题获取消息,但在这些消息上完成函数调用后,它似乎仍然在“删除”消息。

最佳答案

正如 Jambor 所说,默认行为是在函数成功完成时完成消息(请参阅 documentation ),或者在函数失败时放弃。

您可以在 SDK Repo 中看到此行为的实现查看 MessageProcessor 类的代码:

public virtual async Task CompleteProcessingMessageAsync(BrokeredMessage message, FunctionResult result, CancellationToken cancellationToken)
{
    if (result.Succeeded)
    {
        if (!MessageOptions.AutoComplete)
        {
            // AutoComplete is true by default, but if set to false
            // we need to complete the message
            cancellationToken.ThrowIfCancellationRequested();
            await message.CompleteAsync();
        }
    }
    else
    {
        cancellationToken.ThrowIfCancellationRequested();
        await message.AbandonAsync();
    }
}

有趣的一点:这是一个虚函数。

ServiceBusConfiguration 公开 MessagingProvider 属性。

如果您查看 SDK Repo 中默认 MessagingProvider 类的代码,您将看到您可以重写负责创建新 MessageProcessor 的方法:

/// <summary>
/// Creates a <see cref="MessageProcessor"/> for the specified ServiceBus entity.
/// </summary>
/// <param name="entityPath">The ServiceBus entity to create a <see cref="MessageProcessor"/> for.</param>
/// <returns>The <see cref="MessageProcessor"/>.</returns>
public virtual MessageProcessor CreateMessageProcessor(string entityPath)
{
    if (string.IsNullOrEmpty(entityPath))
    {
        throw new ArgumentNullException("entityPath");
    }
    return new MessageProcessor(_config.MessageOptions);
}

这个函数也是虚拟的。

现在您可以创建自己的 MessagingProviderMessageProcessor 实现:

public class CustomMessagingProvider : MessagingProvider
{
    private readonly ServiceBusConfiguration _config;

    public CustomMessagingProvider(ServiceBusConfiguration config) : base(config)
    {
        _config = config;
    }

    public override MessageProcessor CreateMessageProcessor(string entityPath)
    {
        if (string.IsNullOrEmpty(entityPath))
        {
            throw new ArgumentNullException("entityPath");
        }
        return new CustomMessageProcessor(_config.MessageOptions);
    }

    class CustomMessageProcessor : MessageProcessor
    {
        public CustomMessageProcessor(OnMessageOptions messageOptions) : base(messageOptions)
        {
        }

        public override async Task CompleteProcessingMessageAsync(BrokeredMessage message, FunctionResult result, CancellationToken cancellationToken)
        {
            if (!result.Succeeded)
            {
                cancellationToken.ThrowIfCancellationRequested();
                await message.AbandonAsync();
            }
        }
    }
}

并像这样配置您的 JobHost :

public static void Main()
{
    var config = new JobHostConfiguration();
    var sbConfig = new ServiceBusConfiguration
    {
        MessageOptions = new OnMessageOptions
        {
            AutoComplete = false
        }
    };
    sbConfig.MessagingProvider = new CustomMessagingProvider(sbConfig);
    config.UseServiceBus(sbConfig);
    var host = new JobHost(config);
    host.RunAndBlock();
}

这是这个问题的技术部分......

现在,如果您没有完成消息,该消息将一次又一次地用于相同的功能,直到您达到 MaxDeliveryCount,然后您的消息将成为死信。因此,即使将您的函数设计为幂等,我也很确定这不是您想要的。

也许你应该多解释一下你想要实现的目标?

如果您正在寻找父子队列通信(请参阅问题评论),有一篇很好的文章解释了如何使用 ASB 设计工作流程:

或者,您可以查看Defer BrokerMessage 对象上的方法:

Indicates that the receiver wants to defer the processing for this message.

它将允许您保留父消息,直到处理子消息。

关于Azure WebJobs SDK : Not calling message.complete() 在特定 ServiceBus 触发的函数上,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39730942/

相关文章:

visual-studio-2010 - Azure SDK 1.5安装错误

c# - 使用 AspNet SignalR 连接 Redis

Azure 服务总线主题请求与消息

azure - 如何从Azure数据工厂中的上一行获取数据

json - 如何将数据从 CosmosDB 中的文档传输到 Azure SQL 数据库?

azure - WebJob QueueTrigger - 每个函数的限制

c# - Azure WebJobs 异步函数中的事务支持

Azure 网络作业 : Replay successful function behavior

Azure 服务总线 - 锁定持续时间和重复,它们有何关系?

c# - Azure函数: how to efficiently send batch of messages to Service Bus using bindings?