我们正在从 Azure EventHub 的旧 SDK 版本迁移到版本 4 SDK,我们想知道是否仍然需要自己实现事件的异步处理。我们正在使用 EventProcessorClientBuilder
实现 EventProcessorClient
。所以我的问题是:在这个例子中 onEvent
方法是否被称为异步?
new EventProcessorClientBuilder()
.connectionString(connectionString)
.checkpointStore(new BlobCheckpointStore(blobContainer))
.processEvent(this::onEvent)
.buildEventProcessorClient();
我深入研究了库,发现在 PartitionPumpManager
中构建了一个 EventHubConsumerAsyncClient
。但我不是 100% 确定。
如果是异步的,是否有办法达到最大任务数和/或为单个任务设置超时?
最佳答案
“onEvent”的调用是异步的,即它不会阻塞主线程。
如下所示,有四种不同(互斥)的方式来注册回调以接收事件。每个都将传递事件的回调作为第一个参数 -
processEvent(Consumer<EventContext> onEvent);
processEvent(Consumer<EventContext> onEvent, Duration timeout);
processEvent(Consumer<EventBatchContext> onBatchOfEvents, int batchSize);
processEvent(Consumer<EventBatchContext> onBatchOfEvents, int batchSize, Duration timeout);
选项 2 将超时作为第二个参数,即等待事件可用的时间。如果超时内没有事件到达,库将使用 EventContext 对象调用回调,并在此上下文对象上调用 getEventData() 返回 null。
选项 3 和 4 允许应用程序注册回调来处理一批事件;第二个参数表示每批中所需的事件数。将使用 EventBatchContext 对象调用回调,可以从 EventBatchContext::getEvents() 访问批处理。
此外,选项 4 允许指定等待大小为 N 的批处理(其中 N 是第二个参数)传递到回调的时间。超时计时器到期后,如果只有 M 个事件(其中 M < N),则包含 M 个元素的批处理将被传递到回调。如果超时内没有可用事件,则传递给回调的 EventBatchContext 对象在调用其 getEvents() 时将返回一个空列表。
我认为选项 3 或 4 正是您正在寻找的,我们建议使用它,因为每个事件的回调都会产生轻微的开销。
关于java - EventProcessorClient 中事件的异步处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75326628/