java - Azure 事件中心 Java 客户端未处理数据

标签 java azure azure-eventhub

遵循 EventProcessorHost example我们在 onEvents() 中实现了自定义逻辑。有些数据没有被处理,我怀疑这是因为 Java 客户端抛出的警告。

在日志中,我们看到 StorageException(用于更新租约或检查点的 Blob 存储超时)、LeaseLostException(可能是由于之前的异常)和 EventHubException(当事件中心移动或短时间离线时)。

基本上我的问题是:这些异常如何影响事件的处理以及我们如何确保没有事件被跳过(例如,通过重试和完全关闭作为最后手段的异常处理)?

我通读了 docs并搜索了其他问题,无法找到满意的答案(thisthis其中一个提供了一些见解)。

我们的代码:

public class EventProcessor implements IEventProcessor {
    ...
    @Override
    public void onEvents(PartitionContext context, Iterable<EventData> events) throws Exception {
        for (EventData event : events) {
            try {
                String message = new String(event.getBytes(), StandardCharsets.UTF_8);

                mystuff.process(message);

                this.checkpointBatchingCount++;
                if ((checkpointBatchingCount % 50) == 0) {
                    context.checkpoint(data).get();
                }
            } catch (Exception e) {
                LOG.warn("Processing event failed: {}", e.getMessage())
            }
        }
    }
    ...
}

最佳答案

根据我对 EventProcessor 的理解,您将重新处理事件而不是丢失事件。可能还有另一个根本问题。

当您调用 checkpoint 时会发生什么,它会保留该 EventData 的序列号(偏移量等)流,表示“我已经处理了这个。”

当您收到 StorageException 时,这意味着序列号未成功保留,因此旧事件的序列号仍保留在您的 Blob 存储中。如果您遇到 EventHubException ,其中处理器在重新启动时断开连接,它将尝试声明所有租约已过期,并从最后一个成功的检查点开始处理。

如果另一个事件处理器“窃取”了您当前正在处理的分区,您将得到LeaseLostException。当 EventProcessor 的多个实例正在运行并且客户端尝试平衡正在运行的实例之间的分区数量时,就会发生这种情况。

关于java - Azure 事件中心 Java 客户端未处理数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50953546/

相关文章:

azure - 流分析作业部署为 Azure 资源管理器 (ARM) 模板

java - Socket通信,Java客户端C服务器

java - 使用HQL Editor查询时Hibernate报错

azure - 使用 Python SDK 和用户或系统分配的托管标识从 Azure Key Vault 读取 Azure Synapse 中的 secret

c# - Azure Web PubSub 和事件网格/事件中心之间有什么区别?

python - Azure 事件中心 Python SDK

java - hibernate auto create in-memory hsqldb 导致找不到序列

java - 循环和 JOptionpane

c# - 有人在 Azure Web Apps 中使用 SQL 依赖项时遇到问题吗?

azure - 从 Azure Blob 存储下载文件 - 错误 : The given path's format is not supported