我正在尝试创建一种方法来处理事件中心中的事件峰值。我当前的 poc 解决方案只是在消耗事件时触发并忘记任务,而不是等待它们,然后使用信号量限制并行任务量以避免资源匮乏。
限制事物的实用程序:
public class ThrottledParallelTaskFactory
{
...
public Task StartNew(Func<Task> func)
{
_logger.LogDebug("Available semaphore count {AvailableDataConsumerCount} out of total {DataConsumerCountLimit}", _semaphore.CurrentCount, _limit);
_semaphoreSlim.Wait(_timeout);
_ = Task.Run(func)
.ContinueWith(t =>
{
if (t.Status is TaskStatus.Faulted or TaskStatus.Canceled or TaskStatus.RanToCompletion)
{
_semaphoreSlim.Release();
_logger.LogDebug("Available semaphore count {AvailableDataConsumerCount} out of total {DataConsumerCountLimit}", _semaphore.CurrentCount, _limit);
}
if (t.Status is TaskStatus.Canceled or TaskStatus.Faulted)
{
_logger?.LogError(t.Exception, "Parallel task failed");
}
});
return Task.CompletedTask;
}
}
我的EventProcessorClient.ProcessEventAsync
委托(delegate):
private Task ProcessEvent(ProcessEventArgs arg)
{
var sw = Stopwatch.StartNew();
try
{
_throttledParallelTaskFactory.StartNew(async () => await Task.Delay(1000));
}
catch (Exception e)
{
_logger.LogError(e, "Failed to process event");
}
_logger.LogDebug($"Took {sw.ElapsedMilliseconds} ms");
return Task.CompletedTask;
}
运行此设置一段时间后,我注意到当我配置的限制为 15 时,我的节流器的信号量在并行运行 2-3 个任务时达到最大值。这表明我的处理程序需要 333-500 毫秒才能完成,但处理程序内的秒表显示整个处理程序需要 0 毫秒执行。后来我添加了处理程序开始/结束时的时间戳记录以确认它,它确实需要 0-1 毫秒,但它们之间有一个神秘的 300-600 毫秒差距。 注意:对于当前的测试,该客户端正在处理数百万个事件的积压,而不是处理实时数据,这可能会导致事件之间出现类似的延迟。
在每个事件之后,EventProcessorClient
是否会在内部设置检查点? 300-500 毫秒在我看来似乎很长。
我既使用了默认的缓存事件/预取计数,又增加了计数,没有太大区别。
编辑:
最终这不是与实现相关的网络问题
最佳答案
您没有衡量正确的事情,基本上您使用的是错误的 async/await 和 Task。
private Task ProcessEvent(ProcessEventArgs arg)
{
var sw = Stopwatch.StartNew();
try
{
_throttledParallelTaskFactory.StartNew(async () => await Task.Delay(1000));
}
catch (Exception e)
{
_logger.LogError(e, "Failed to process event");
}
_logger.LogDebug($"Took {sw.ElapsedMilliseconds} ms");
return Task.CompletedTask;
}
在上面的代码中,不等待对_throttledParallelTaskFactory.StartNew
的调用。所以秒表没有什么可测量的。此外,由于不等待调用,因此不会捕获任何异常。
您应该将异常处理和时间测量移至 StartNew
方法,如下所示:
private Task ProcessEvent(ProcessEventArgs arg)
{
_throttledParallelTaskFactory.StartNew(() => Task.Delay(1000));
return Task.CompletedTask;
}
public class ThrottledParallelTaskFactory
{
public async Task StartNew(Func<Task> func)
{
var sw = Stopwatch.StartNew();
_logger.LogDebug("Available semaphore count {AvailableDataConsumerCount} out of total {DataConsumerCountLimit}", _semaphore.CurrentCount, _limit);
_semaphoreSlim.Wait(_timeout);
try
{
await func.Invoke();
}
catch
{
_logger.LogError(e, "Failed to process event");
_logger?.LogError(t.Exception, "Parallel task failed");
}
finally
{
_semaphoreSlim.Release();
_logger.LogDebug("Available semaphore count {AvailableDataConsumerCount} out of total {DataConsumerCountLimit}", _semaphore.CurrentCount, _limit);
_logger.LogDebug($"Took {sw.ElapsedMilliseconds} ms");
}
}
}
看看我们如何摆脱对 ContinueWith
的调用?此外,由于 func 已经代表一个 Task
,因此无需将代码包装在对 Task.Run
的调用中。
Does by any chance EventProcessorClient checkpoint internally after every single event?
不,事实并非如此。您必须手动执行检查点。
关于c# - EventProcessorClient 事件之间的延迟,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70555889/