java - azure 事件中心 - 仅处理新消息

标签 java azure azure-eventhub

在 Java 中拥有简单的事件中心客户端(只有 1 个分区)

public static void main(String[] args) throws Exception {
        // Create a blob container client that you use later to build an event processor client to receive and process events
        BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
                .connectionString(storageConnectionString)
                .containerName(storageContainerName)
                .buildAsyncClient();

        // Create a builder object that you will use later to build an event processor client to receive and process events and errors.
        EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
                .connectionString(connectionString, eventHubName)
                .consumerGroup("$default")
                .processEvent(PARTITION_PROCESSOR)
                .processError(ERROR_HANDLER)//.checkpointStore(new SampleCheckpointStore());
                .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));

        // Use the builder object to create an event processor client
        EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();

        System.out.println("Starting event processor");
        eventProcessorClient.start();

        System.out.println("Press enter to stop.");
        System.in.read();

        System.out.println("Stopping event processor");
        eventProcessorClient.stop();
        System.out.println("Event processor stopped.");

        System.out.println("Exiting process");
    }

如果我先运行客户端然后发送消息,它会按预期工作。消息已处理。

如果我停止客户端,然后将消息发送到事件中心,然后启动客户端,之前发送的消息是 根本没有处理。处理之后发送的消息。为什么?

如果我停止客户端,然后删除 Azure Blob 存储中的检查点数据,然后启动客户端,则不会处理事件中心中的现有消息。处理之后发送的消息。为什么?

使用的库:

        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-eventhubs</artifactId>
            <version>5.12.2</version>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
            <version>1.13.0</version>
        </dependency>

我试过了

  • 将库版本更改为 5.10,但没有帮助
  • 使用不同的检查点存储(我使用内存一个),但没有变化 完全没有

最佳答案

在其默认配置中,Java 处理器将每个分区的读取器定位在 eventPosition.latest()当没有找到检查点时,这意味着它将只读取处理器启动后发布的事件。 (全套定位逻辑可见here)

构建处理器时,initialPartitionEventPosition可以提供map来为每个分区指定不同的起始位置。

关于java - azure 事件中心 - 仅处理新消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73086302/

相关文章:

java - 关于进口声明

java - 无法使用@IdClass 转换实体中的请求元素

用于获取合并请求中已更改文件列表的 Azure Repos REST API

git - 从 git azure 获取存储库统计信息/信息

azure - 寻求有关 Azure 服务总线中的事件中心与主题的清晰信息

azure - 针对 Azure 事件中心的 RBAC 身份验证

java - HADOOP - 作为映射器输出生成的输出文件数

java.text.ParseException : Unparseable date

Azure 应用程序配置、 key 保管库和托管服务身份 (.NET Core 3.1)

postgresql - Azure Postgresql 数据库诊断设置中看不到事件中心命名空间