java - EventProcessorClient 中事件的异步处理

标签 java azure asynchronous azure-eventhub

我们正在从 Azure EventHub 的旧 SDK 版本迁移到版本 4 SDK,我们想知道是否仍然需要自己实现事件的异步处理。我们正在使用 EventProcessorClientBuilder 实现 EventProcessorClient。所以我的问题是:在这个例子中 onEvent 方法是否被称为异步?

new EventProcessorClientBuilder()
        .connectionString(connectionString)
        .checkpointStore(new BlobCheckpointStore(blobContainer))
        .processEvent(this::onEvent)
        .buildEventProcessorClient();

我深入研究了库,发现在 PartitionPumpManager 中构建了一个 EventHubConsumerAsyncClient。但我不是 100% 确定。

如果是异步的,是否有办法达到最大任务数和/或为单个任务设置超时?

最佳答案

“onEvent”的调用是异步的,即它不会阻塞主线程。

如下所示,有四种不同(互斥)的方式来注册回调以接收事件。每个都将传递事件的回调作为第一个参数 -

processEvent(Consumer<EventContext> onEvent);
processEvent(Consumer<EventContext> onEvent, Duration timeout);
processEvent(Consumer<EventBatchContext> onBatchOfEvents, int batchSize);
processEvent(Consumer<EventBatchContext> onBatchOfEvents, int batchSize, Duration timeout);

选项 2 将超时作为第二个参数,即等待事件可用的时间。如果超时内没有事件到达,库将使用 EventContext 对象调用回调,并在此上下文对象上调用 getEventData() 返回 null。

选项 3 和 4 允许应用程序注册回调来处理一批事件;第二个参数表示每批中所需的事件数。将使用 EventBatchContext 对象调用回调,可以从 EventBatchContext::getEvents() 访问批处理。

此外,选项 4 允许指定等待大小为 N 的批处理(其中 N 是第二个参数)传递到回调的时间。超时计时器到期后,如果只有 M 个事件(其中 M < N),则包含 M 个元素的批处理将被传递到回调。如果超时内没有可用事件,则传递给回调的 EventBatchContext 对象在调用其 getEvents() 时将返回一个空列表。

我认为选项 3 或 4 正是您正在寻找的,我们建议使用它,因为每个事件的回调都会产生轻微的开销。

关于java - EventProcessorClient 中事件的异步处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75326628/

相关文章:

java - 获取用户输入 "Scanner name = new Scanner(System.in);"后如何在if语句中使用is?

java - 按两个条件拆分字符串

java - 每个 Spring Web 应用程序仅加载一次图像

azure - ADF 获取属性 "status": "Succeeded" and IF for validation

azure - 使用 Azure Rest API 和 Postman 更新逻辑应用重复频率

JavaScript 在带有回调的循环中异步

java - strictfp 敏感 JVM 示例

asp.net - 上传到Azure时,为了减小包大小,二进制格式数据库应该存储在哪里?

swift - 应该根据要求执行 segue

python - 如何在 Python 中使用 `async for`?