我正在尝试运行 ChangeFeedProcessor 的 2 个实例,它们都指向相同的集合,并在 Cosmos 帐户中使用相同的租赁集合。我在两个实例中都指定了唯一的hostName
我的目的是根据逻辑分区(根据 Microsoft 文档)在实例之间分配 Feed 负载
当我尝试启动第二个实例时,我在控制台中收到以下异常。
有什么不同的方法可以实现这一目标吗?
Exception in thread "pool-23-thread-3" java.lang.NullPointerException at com.azure.data.cosmos.internal.changefeed.implementation.ExceptionClassifier.classifyClientException(ExceptionClassifier.java:56) at com.azure.data.cosmos.internal.changefeed.implementation.PartitionProcessorImpl.lambda$run$0(PartitionProcessorImpl.java:115) at reactor.core.publisher.MonoRunnable.block(MonoRunnable.java:66) at com.azure.data.cosmos.internal.changefeed.implementation.PartitionSupervisorImpl$1.run(PartitionSupervisorImpl.java:89) at java.lang.Thread.run(Thread.java:748) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Exception in thread "pool-19-thread-3" java.lang.NullPointerException at com.azure.data.cosmos.internal.changefeed.implementation.ExceptionClassifier.classifyClientException(ExceptionClassifier.java:56) at com.azure.data.cosmos.internal.changefeed.implementation.PartitionProcessorImpl.lambda$run$0(PartitionProcessorImpl.java:115) at reactor.core.publisher.MonoRunnable.block(MonoRunnable.java:66) at com.azure.data.cosmos.internal.changefeed.implementation.PartitionSupervisorImpl$1.run(PartitionSupervisorImpl.java:89) at java.lang.Thread.run(Thread.java:748) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Exception in thread "pool-25-thread-3" java.lang.NullPointerException at com.azure.data.cosmos.internal.changefeed.implementation.ExceptionClassifier.classifyClientException(ExceptionClassifier.java:56) at com.azure.data.cosmos.internal.changefeed.implementation.PartitionProcessorImpl.lambda$run$0(PartitionProcessorImpl.java:115)...etc
我使用了下面的maven依赖
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-cosmos</artifactId>
<version>3.0.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
代码片段
- 创建 ChangeFeedProcessors 列表(针对数据库中找到的所有容器)
//FEED DATABASE
CosmosDatabase feedDatabase = cosmosClient.getDatabase(cosmosDbName);
//LEASE DATABASE
CosmosDatabase leaseDatabase = cosmosClient.getDatabase(cosmosDbName + LEASES);
//List of Containers in Feed Database
List<CosmosContainerProperties> containerPropertiesList = null;
try {
Flux<FeedResponse<CosmosContainerProperties>> containers = feedDatabase.readAllContainers();
List<FeedResponse<CosmosContainerProperties>> list = containers.toStream().collect(Collectors.toList());//Abhishek Optimize
containerPropertiesList = list.get(0).results();
}
catch (Exception e) {
System.out.println("Fail to query Containers");
throw new ServiceException("Fail to query Containers");
}
containerPropertiesList.parallelStream().forEach(cosmosContainerProperties -> {
//FEED CONTAINER
String containerName = cosmosContainerProperties.getString("id");
CosmosContainer feedContainer = feedDatabase.getContainer(containerName);
//LEASE CONTAINER
String leaseContainerName = containerName + "-leases";
CosmosContainer leaseContainer = leaseDatabase.getContainer(leaseContainerName);
//Building ChangeFeedProcessor for current Container
ChangeFeedProcessorOptions changeFeedProcessorOptions = new ChangeFeedProcessorOptions();
changeFeedProcessorOptions.startTime(OffsetDateTime.now());
ChangeFeedProcessor changeFeedProcessor = null;
try {
ChangeFeedProcessor.BuilderDefinition builderDefinition = ChangeFeedProcessor.Builder()
.hostName("Host1")//used Host2 in the other Host
.feedContainer(feedContainer)
.leaseContainer(leaseContainer)
.options(changeFeedProcessorOptions)
.handleChanges(docs -> {
documentChangeHandler.processChanges(containerName, docs);
});
changeFeedProcessor = builderDefinition.build();
}
catch (Exception e) {
System.out.println("Fail to initialize ChangeFeedProcessor for " + containerName);
}
resultList.add(changeFeedProcessor);
System.out.println("processed: " + leaseContainerName);
});
- 然后返回resultList,并在下面的方法中启动ChangeFeedProcessors
public void startChangeFeed() {
if (null != changeFeedProcessors && !changeFeedProcessors.isEmpty()) {
changeFeedProcessors.parallelStream().forEach(processor->processor.start().block());
}
else {
System.out.println("changeFeedProcessors list is empty.. probably changeFeedProcessor has not been setup yet");
}
}
最佳答案
从评论来看,该问题与 VPN/代理或阻止所需端口范围的内容有关。
直接模式,需要在VPN/代理/防火墙中打开并配置一定的端口范围:
如果无法配置,您可以切换到网关/HTTP 模式。
更改源处理器使用第二个 Leases 集合来存储状态(主要在此处 https://learn.microsoft.com/azure/cosmos-db/change-feed-processor#components-of-the-change-feed-processor 以及 .NET 示例进行解释,但概念是相同的)。当前模型为每个物理分区创建 1 个租约(我说当前模型是因为此实现可以在将来改进以实现更好的分配),并且每个租约只能由 1 个实例拥有。因此,如果您有 2 个租约和 2 个实例,则每个实例将拥有 1 个租约。
每个实例将根据其拥有的租约处理分区中的更改。
负载分布为 90/10 意味着集合中发生的更改似乎主要发生在一个分区(热分区)中,并且分布不均匀。
关于java - 为 Azure Cosmos 运行 ChangeFeedProcessor 的多个实例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59623040/