我有一个以下形式的服务总线触发函数:
public static void ProcessJobs([ServiceBusTrigger("topicname", "subscriptionname", AccessRights.Listen)] BrokeredMessage input, [ServiceBus("topic2name", "subscription2name", AccessRights.Send)] out BrokeredMessage output)
{
output = new BrokeredMessage(input.GetBody<String>());
}
我的用例是,此函数只是从一个主题获取消息并将其推送到另一个主题。我不想在此过程中从源主题中删除消息。
这可以实现吗?
此外,我在哪里可以找到有关 AccessRights 及其如何影响消息访问的更多信息。例如:在上面的示例中,我使用 AccessRights.Listen 从输入主题获取消息,但在这些消息上完成函数调用后,它似乎仍然在“删除”消息。
最佳答案
正如 Jambor 所说,默认行为是在函数成功完成时完成消息(请参阅 documentation ),或者在函数失败时放弃。
您可以在 SDK Repo 中看到此行为的实现查看 MessageProcessor
类的代码:
public virtual async Task CompleteProcessingMessageAsync(BrokeredMessage message, FunctionResult result, CancellationToken cancellationToken)
{
if (result.Succeeded)
{
if (!MessageOptions.AutoComplete)
{
// AutoComplete is true by default, but if set to false
// we need to complete the message
cancellationToken.ThrowIfCancellationRequested();
await message.CompleteAsync();
}
}
else
{
cancellationToken.ThrowIfCancellationRequested();
await message.AbandonAsync();
}
}
有趣的一点:这是一个虚函数。
ServiceBusConfiguration
公开 MessagingProvider
属性。
如果您查看 SDK Repo 中默认 MessagingProvider
类的代码,您将看到您可以重写负责创建新 MessageProcessor
的方法:
/// <summary>
/// Creates a <see cref="MessageProcessor"/> for the specified ServiceBus entity.
/// </summary>
/// <param name="entityPath">The ServiceBus entity to create a <see cref="MessageProcessor"/> for.</param>
/// <returns>The <see cref="MessageProcessor"/>.</returns>
public virtual MessageProcessor CreateMessageProcessor(string entityPath)
{
if (string.IsNullOrEmpty(entityPath))
{
throw new ArgumentNullException("entityPath");
}
return new MessageProcessor(_config.MessageOptions);
}
这个函数也是虚拟的。
现在您可以创建自己的 MessagingProvider
和 MessageProcessor
实现:
public class CustomMessagingProvider : MessagingProvider
{
private readonly ServiceBusConfiguration _config;
public CustomMessagingProvider(ServiceBusConfiguration config) : base(config)
{
_config = config;
}
public override MessageProcessor CreateMessageProcessor(string entityPath)
{
if (string.IsNullOrEmpty(entityPath))
{
throw new ArgumentNullException("entityPath");
}
return new CustomMessageProcessor(_config.MessageOptions);
}
class CustomMessageProcessor : MessageProcessor
{
public CustomMessageProcessor(OnMessageOptions messageOptions) : base(messageOptions)
{
}
public override async Task CompleteProcessingMessageAsync(BrokeredMessage message, FunctionResult result, CancellationToken cancellationToken)
{
if (!result.Succeeded)
{
cancellationToken.ThrowIfCancellationRequested();
await message.AbandonAsync();
}
}
}
}
并像这样配置您的 JobHost :
public static void Main()
{
var config = new JobHostConfiguration();
var sbConfig = new ServiceBusConfiguration
{
MessageOptions = new OnMessageOptions
{
AutoComplete = false
}
};
sbConfig.MessagingProvider = new CustomMessagingProvider(sbConfig);
config.UseServiceBus(sbConfig);
var host = new JobHost(config);
host.RunAndBlock();
}
这是这个问题的技术部分......
现在,如果您没有完成消息,该消息将一次又一次地用于相同的功能,直到您达到 MaxDeliveryCount
,然后您的消息将成为死信。因此,即使将您的函数设计为幂等,我也很确定这不是您想要的。
也许你应该多解释一下你想要实现的目标?
如果您正在寻找父子队列通信(请参阅问题评论),有一篇很好的文章解释了如何使用 ASB 设计工作流程:
或者,您可以查看Defer
BrokerMessage 对象上的方法:
Indicates that the receiver wants to defer the processing for this message.
它将允许您保留父消息,直到处理子消息。
关于Azure WebJobs SDK : Not calling message.complete() 在特定 ServiceBus 触发的函数上,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39730942/