我想从 eventhub 接收最新消息,因为之前发送到 eventhub 的消息太多。 我设置了自己的消费组,但似乎从早期到现在只能接收消息。
有什么方法可以跳到最新的消息吗?
最佳答案
如果你知道checkpoint在azure eventhubs中,这应该很容易实现。
简单来说,checkpoint是一个存储在azure blob存储中的文件,每次从eventhub读取数据时,偏移量都会记录在checkpoint中。当下次使用相同的检查点从 eventhub 读取数据时,它将从偏移量开始读取。
因此,如果您想跳过读取旧数据,可以首先创建一个测试接收器项目,如 this并设置检查点。然后下次,在您的生产中,您可以使用相同的检查点,由于偏移量,它总是会跳过旧数据。
另一种方法是,您可以将 EventProcessorOptions
与 RegisterEventProcessorAsync
方法一起使用。当您选择使用此方法时,您需要手动从 Azure Blob 存储中删除检查点,否则此设置将被检查点覆盖。
如下所示的示例,位于您的 receiver method 中:
private static async Task MainAsync(string[] args)
{
Console.WriteLine("Registering EventProcessor...");
var eventProcessorHost = new EventProcessorHost(
EventHubName,
PartitionReceiver.DefaultConsumerGroupName,
EventHubConnectionString,
StorageConnectionString,
StorageContainerName);
//here, you can get only the data sent in event hub in the recent an hour.
var options = new EventProcessorOptions
{
InitialOffsetProvider = (partitionId) => EventPosition.FromEnqueuedTime(DateTime.UtcNow.AddHours(-1))
};
// Registers the Event Processor Host and starts receiving messages
await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(options);
Console.WriteLine("Receiving. Press ENTER to stop worker.");
Console.ReadLine();
// Disposes of the Event Processor Host
await eventProcessorHost.UnregisterEventProcessorAsync();
}
关于c# - 如何在c#中从Eventhub接收最新消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58854909/