我正在尝试通过 java 应用程序从 Azure 服务总线获取消息。我创建了必要的客户端配置,例如通过 ManagementClient 成功连接
@Bean
public ClientSettings getMessageReceiver() throws ServiceBusException, InterruptedException {
AzureTokenCredentials azureTokenCredentials = new ApplicationTokenCredentials(
"clientID,
"domain",
"secret",
AzureEnvironment.AZURE
);
TokenProvider tokenProvider = TokenProvider.createAzureActiveDirectoryTokenProvider(
new AzureAuthentication(azureTokenCredentials),
AzureEnvironment.AZURE.activeDirectoryEndpoint(),
null
);
ClientSettings clientSettings = new ClientSettings(tokenProvider,
RetryPolicy.getDefault(),
Duration.ofSeconds(30),
TransportType.AMQP);
return clientSettings;
}
ManagementClient managementClient =
new ManagementClient(Util.convertNamespaceToEndPointURI("namespace"),
clientSettings);
managementClient.getTopics();
但是当我尝试从特定主题获取消息时:
SubscriptionClient subscriptionClient = new SubscriptionClient("namespace", "events/subscriptions/subscription", clientSettings, ReceiveMode.PEEKLOCK);
并收到错误消息:
It is not possible for an entity that requires sessions to create a non-sessionful message receiver.
应提供哪些额外步骤?
最佳答案
您已启用Session (默认情况下禁用)在主题订阅中创建时。如果您不需要消息 session ,请在禁用“需要 session ”的情况下重新创建订阅(注意:创建订阅后您将无法更改该属性)。
或者,如果您确实需要消息 session ,请像下面一样更新代码以首先接收 session ,然后从接收到的 session 接收消息。所有代码示例均可在 here 找到以及具体的 session 示例 here .
// The connection string value can be obtained by:
// 1. Going to your Service Bus namespace in Azure Portal.
// 2. Go to "Shared access policies"
// 3. Copy the connection string for the "RootManageSharedAccessKey" policy.
String connectionString = "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};"
+ "SharedAccessKey={key}";
// Create a receiver.
// "<<topic-name>>" will be the name of the Service Bus topic you created inside the Service Bus namespace.
// "<<subscription-name>>" will be the name of the session-enabled subscription.
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
.connectionString(connectionString)
.sessionReceiver()
.receiveMode(ReceiveMode.PEEK_LOCK)
.topicName("<<topic-name>>")
.subscriptionName("<<subscription-name>>")
.buildAsyncClient();
Disposable subscription = receiver.receiveMessages()
.flatMap(context -> {
if (context.hasError()) {
System.out.printf("An error occurred in session %s. Error: %s%n",
context.getSessionId(), context.getThrowable());
return Mono.empty();
}
System.out.println("Processing message from session: " + context.getSessionId());
// Process message
return receiver.complete(context.getMessage());
}).subscribe(aVoid -> {
}, error -> System.err.println("Error occurred: " + error));
// Subscribe is not a blocking call so we sleep here so the program does not end.
TimeUnit.SECONDS.sleep(60);
// Disposing of the subscription will cancel the receive() operation.
subscription.dispose();
// Close the receiver.
receiver.close();
关于java - 需要 session 的实体不可能为 azure IMessageReceiver 创建非 session 消息接收器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64387019/