遵循 EventProcessorHost example我们在 onEvents() 中实现了自定义逻辑。有些数据没有被处理,我怀疑这是因为 Java 客户端抛出的警告。
在日志中,我们看到 StorageException(用于更新租约或检查点的 Blob 存储超时)、LeaseLostException(可能是由于之前的异常)和 EventHubException(当事件中心移动或短时间离线时)。
基本上我的问题是:这些异常如何影响事件的处理以及我们如何确保没有事件被跳过(例如,通过重试和完全关闭作为最后手段的异常处理)?
我通读了 docs并搜索了其他问题,无法找到满意的答案(this和this其中一个提供了一些见解)。
我们的代码:
public class EventProcessor implements IEventProcessor {
...
@Override
public void onEvents(PartitionContext context, Iterable<EventData> events) throws Exception {
for (EventData event : events) {
try {
String message = new String(event.getBytes(), StandardCharsets.UTF_8);
mystuff.process(message);
this.checkpointBatchingCount++;
if ((checkpointBatchingCount % 50) == 0) {
context.checkpoint(data).get();
}
} catch (Exception e) {
LOG.warn("Processing event failed: {}", e.getMessage())
}
}
}
...
}
最佳答案
根据我对 EventProcessor 的理解,您将重新处理事件而不是丢失事件。可能还有另一个根本问题。
当您调用 checkpoint
时会发生什么,它会保留该 EventData
的序列号(偏移量等)流,表示“我已经处理了这个。”
当您收到 StorageException
时,这意味着序列号未成功保留,因此旧事件的序列号仍保留在您的 Blob 存储中。如果您遇到 EventHubException
,其中处理器在重新启动时断开连接,它将尝试声明所有租约已过期,并从最后一个成功的检查点开始处理。
如果另一个事件处理器“窃取”了您当前正在处理的分区,您将得到LeaseLostException
。当 EventProcessor 的多个实例正在运行并且客户端尝试平衡正在运行的实例之间的分区数量时,就会发生这种情况。
关于java - Azure 事件中心 Java 客户端未处理数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50953546/