java - 为 Azure Cosmos 运行 ChangeFeedProcessor 的多个实例

标签 java azure azure-cosmosdb azure-cosmosdb-sqlapi

我正在尝试运行 ChangeFeedProcessor 的 2 个实例,它们都指向相同的集合,并在 Cosmos 帐户中使用相同的租赁集合。我在两个实例中都指定了唯一的hostName

我的目的是根据逻辑分区(根据 Microsoft 文档)在实例之间分配 Feed 负载

当我尝试启动第二个实例时,我在控制台中收到以下异常。

有什么不同的方法可以实现这一目标吗?

Exception in thread "pool-23-thread-3" java.lang.NullPointerException at com.azure.data.cosmos.internal.changefeed.implementation.ExceptionClassifier.classifyClientException(ExceptionClassifier.java:56) at com.azure.data.cosmos.internal.changefeed.implementation.PartitionProcessorImpl.lambda$run$0(PartitionProcessorImpl.java:115) at reactor.core.publisher.MonoRunnable.block(MonoRunnable.java:66) at com.azure.data.cosmos.internal.changefeed.implementation.PartitionSupervisorImpl$1.run(PartitionSupervisorImpl.java:89) at java.lang.Thread.run(Thread.java:748) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Exception in thread "pool-19-thread-3" java.lang.NullPointerException at com.azure.data.cosmos.internal.changefeed.implementation.ExceptionClassifier.classifyClientException(ExceptionClassifier.java:56) at com.azure.data.cosmos.internal.changefeed.implementation.PartitionProcessorImpl.lambda$run$0(PartitionProcessorImpl.java:115) at reactor.core.publisher.MonoRunnable.block(MonoRunnable.java:66) at com.azure.data.cosmos.internal.changefeed.implementation.PartitionSupervisorImpl$1.run(PartitionSupervisorImpl.java:89) at java.lang.Thread.run(Thread.java:748) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Exception in thread "pool-25-thread-3" java.lang.NullPointerException at com.azure.data.cosmos.internal.changefeed.implementation.ExceptionClassifier.classifyClientException(ExceptionClassifier.java:56) at com.azure.data.cosmos.internal.changefeed.implementation.PartitionProcessorImpl.lambda$run$0(PartitionProcessorImpl.java:115)...etc

我使用了下面的maven依赖

<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-cosmos</artifactId>
    <version>3.0.0</version>
    <exclusions>
        <exclusion>
            <artifactId>slf4j-api</artifactId>
            <groupId>org.slf4j</groupId>
        </exclusion>
        <exclusion>
            <artifactId>slf4j-log4j12</artifactId>
            <groupId>org.slf4j</groupId>
        </exclusion>
        <exclusion>
            <artifactId>guava</artifactId>
            <groupId>com.google.guava</groupId>
        </exclusion>
    </exclusions>
</dependency>

代码片段

  1. 创建 ChangeFeedProcessors 列表(针对数据库中找到的所有容器)

        //FEED DATABASE
        CosmosDatabase feedDatabase = cosmosClient.getDatabase(cosmosDbName);

        //LEASE DATABASE
        CosmosDatabase leaseDatabase = cosmosClient.getDatabase(cosmosDbName + LEASES);

        //List of Containers in Feed Database
        List<CosmosContainerProperties> containerPropertiesList = null;
        try {
            Flux<FeedResponse<CosmosContainerProperties>> containers = feedDatabase.readAllContainers();
            List<FeedResponse<CosmosContainerProperties>> list = containers.toStream().collect(Collectors.toList());//Abhishek Optimize
            containerPropertiesList = list.get(0).results();
        }
        catch (Exception e) {
            System.out.println("Fail to query Containers");
            throw new ServiceException("Fail to query Containers");
        }

containerPropertiesList.parallelStream().forEach(cosmosContainerProperties -> {
                //FEED CONTAINER
                String containerName = cosmosContainerProperties.getString("id");
                CosmosContainer feedContainer = feedDatabase.getContainer(containerName);

                //LEASE CONTAINER
                String leaseContainerName = containerName + "-leases";
                CosmosContainer leaseContainer = leaseDatabase.getContainer(leaseContainerName);

                //Building ChangeFeedProcessor for current Container
                ChangeFeedProcessorOptions changeFeedProcessorOptions = new ChangeFeedProcessorOptions();
                changeFeedProcessorOptions.startTime(OffsetDateTime.now());

                ChangeFeedProcessor changeFeedProcessor = null;
                try {
                    ChangeFeedProcessor.BuilderDefinition builderDefinition = ChangeFeedProcessor.Builder()                           
                            .hostName("Host1")//used Host2 in the other Host
                            .feedContainer(feedContainer)
                            .leaseContainer(leaseContainer)
                            .options(changeFeedProcessorOptions)
                            .handleChanges(docs -> {
                                documentChangeHandler.processChanges(containerName, docs);
                            });
                    changeFeedProcessor = builderDefinition.build();
                }
                catch (Exception e) {
                    System.out.println("Fail to initialize ChangeFeedProcessor for " + containerName);
                }
                resultList.add(changeFeedProcessor);

                System.out.println("processed:  " + leaseContainerName);
            });
  • 然后返回resultList,并在下面的方法中启动ChangeFeedProcessors
  • public void startChangeFeed() {
            if (null != changeFeedProcessors && !changeFeedProcessors.isEmpty()) {
                changeFeedProcessors.parallelStream().forEach(processor->processor.start().block());
            }
            else {
                System.out.println("changeFeedProcessors list is empty.. probably changeFeedProcessor has not been setup yet");
            }
        }
    

    最佳答案

    从评论来看,该问题与 VPN/代理或阻止所需端口范围的内容有关。

    直接模式,需要在VPN/代理/防火墙中打开并配置一定的端口范围:

    Connection Modes

    如果无法配置,您可以切换到网关/HTTP 模式。

    更改源处理器使用第二个 Leases 集合来存储状态(主要在此处 https://learn.microsoft.com/azure/cosmos-db/change-feed-processor#components-of-the-change-feed-processor 以及 .NET 示例进行解释,但概念是相同的)。当前模型为每个物理分区创建 1 个租约(我说当前模型是因为此实现可以在将来改进以实现更好的分配),并且每个租约只能由 1 个实例拥有。因此,如果您有 2 个租约和 2 个实例,则每个实例将拥有 1 个租约。

    每个实例将根据其拥有的租约处理分区中的更改。

    负载分布为 90/10 意味着集合中发生的更改似乎主要发生在一个分区(热分区)中,并且分布不均匀。

    关于java - 为 Azure Cosmos 运行 ChangeFeedProcessor 的多个实例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59623040/

    相关文章:

    azure - 在 DocumentDb 中插入或更新

    java - Lombok 1.18.0 和 Jackson 2.9.6 不能一起工作

    java - 通过Java每天根据计划时间自动备份MySql DB

    azure - 当发布管道中 devops 的代理中的其他任务/作业失败时,如何仅在一个任务/作业中创建错误或通知

    python - Django channel 和 azure

    azure - 如何在 Cosmos DB 容器上显示唯一键?

    java - Camel SEDA 和 VM 端点不使用所有线程

    java - 使用 JAXB 将 XSD 转换为其类时出错

    azure - 将托管身份与 Cosmos Db 表 Api 结合使用

    Azure Cosmos 数据库监控指标