c# - EventProcessorClient 事件之间的延迟

标签 c# azure azure-eventhub

我正在尝试创建一种方法来处理事件中心中的事件峰值。我当前的 poc 解决方案只是在消耗事件时触发并忘记任务,而不是等待它们,然后使用信号量限制并行任务量以避免资源匮乏。

限制事物的实用程序:

    public class ThrottledParallelTaskFactory
    {
        ...

        public Task StartNew(Func<Task> func)
        {
            _logger.LogDebug("Available semaphore count {AvailableDataConsumerCount} out of total {DataConsumerCountLimit}",  _semaphore.CurrentCount, _limit);
            _semaphoreSlim.Wait(_timeout);  
            
            _ = Task.Run(func)
                .ContinueWith(t =>
                {
                    if (t.Status is TaskStatus.Faulted or TaskStatus.Canceled or TaskStatus.RanToCompletion)
                    {
                        _semaphoreSlim.Release();
                        _logger.LogDebug("Available semaphore count {AvailableDataConsumerCount} out of total {DataConsumerCountLimit}", _semaphore.CurrentCount, _limit);
                    }
                    if (t.Status is TaskStatus.Canceled or TaskStatus.Faulted)
                    {
                        _logger?.LogError(t.Exception, "Parallel task failed");
                    }
                });
            return Task.CompletedTask;
        }
    }

我的EventProcessorClient.ProcessEventAsync委托(delegate):

 private Task ProcessEvent(ProcessEventArgs arg)
        {
            var sw = Stopwatch.StartNew();
            try
            {
                _throttledParallelTaskFactory.StartNew(async () => await Task.Delay(1000));
            }
            catch (Exception e)
            {
                _logger.LogError(e, "Failed to process event");
            }
            _logger.LogDebug($"Took {sw.ElapsedMilliseconds} ms");
            return Task.CompletedTask;
        }

运行此设置一段时间后,我注意到当我配置的限制为 15 时,我的节流器的信号量在并行运行 2-3 个任务时达到最大值。这表明我的处理程序需要 333-500 毫秒才能完成,但处理程序内的秒表显示整个处理程序需要 0 毫秒执行。后来我添加了处理程序开始/结束时的时间戳记录以确认它,它确实需要 0-1 毫秒,但它们之间有一个神秘的 300-600 毫秒差距。 注意:对于当前的测试,该客户端正在处理数百万个事件的积压,而不是处理实时数据,这可能会导致事件之间出现类似的延迟。

在每个事件之后,EventProcessorClient 是否会在内部设置检查点? 300-500 毫秒在我看来似乎很长。 我既使用了默认的缓存事件/预取计数,又增加了计数,没有太大区别。

编辑:

最终这不是与实现相关的网络问题

最佳答案

您没有衡量正确的事情,基本上您使用的是错误的 async/await 和 Task。

        private Task ProcessEvent(ProcessEventArgs arg)
        {
            var sw = Stopwatch.StartNew();
            try
            {
                _throttledParallelTaskFactory.StartNew(async () => await Task.Delay(1000));
            }
            catch (Exception e)
            {
                _logger.LogError(e, "Failed to process event");
            }
            _logger.LogDebug($"Took {sw.ElapsedMilliseconds} ms");
            return Task.CompletedTask;
        }

在上面的代码中,不等待对_throttledParallelTask​​Factory.StartNew的调用。所以秒表没有什么可测量的。此外,由于不等待调用,因此不会捕获任何异常。

您应该将异常处理和时间测量移至 StartNew 方法,如下所示:

        private Task ProcessEvent(ProcessEventArgs arg)
        {
            _throttledParallelTaskFactory.StartNew(() => Task.Delay(1000));
            
            return Task.CompletedTask;
        }
public class ThrottledParallelTaskFactory
{
    public async Task StartNew(Func<Task> func)
    {
        var sw = Stopwatch.StartNew();

        _logger.LogDebug("Available semaphore count {AvailableDataConsumerCount} out of total {DataConsumerCountLimit}", _semaphore.CurrentCount, _limit);
        _semaphoreSlim.Wait(_timeout);
        
        try
        {
            await func.Invoke();
        }
        catch
        {
            _logger.LogError(e, "Failed to process event");
            _logger?.LogError(t.Exception, "Parallel task failed");
        }
        finally
        {
            _semaphoreSlim.Release();
            _logger.LogDebug("Available semaphore count {AvailableDataConsumerCount} out of total {DataConsumerCountLimit}", _semaphore.CurrentCount, _limit);
            _logger.LogDebug($"Took {sw.ElapsedMilliseconds} ms");
        }
    }
}

看看我们如何摆脱对 ContinueWith 的调用?此外,由于 func 已经代表一个 Task,因此无需将代码包装在对 Task.Run 的调用中。

Does by any chance EventProcessorClient checkpoint internally after every single event?

不,事实并非如此。您必须手动执行检查点。

关于c# - EventProcessorClient 事件之间的延迟,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70555889/

相关文章:

c# - 如何在 C# 中随机化种子

amazon-web-services - 有没有办法将数据从 AWS Kinesis 发送到 Azure 事件中心?

node.js - 如何使用node js获取Azure EventHub数据

azure - 高吞吐量发送到 EventHubs 导致 MessagingException/TimeoutException/服务器无法处理请求错误

c# - 选择字符串的特定部分 C#

c# - 使用 NEST 构建静态查询

c# - 操作无法完成。无效指针 - Visual Studio 2015 Update 3

azure - 将 Terraform 转换为 ARM

azure - 转移资源时保留RBAC权限

android - 如何在 Azure Devops 管道中运行 Espresso UI 测试