c# - 支持多线程的异步任务队列

标签 c# .net multithreading async-await

我需要实现一个库来请求 vk.com API。问题是 API 每秒仅支持 3 个请求。我想要 API 异步。

重要提示:API 应支持多线程安全访问。

我的想法是实现一些称为 throttler 的类,它允许不超过 3 个请求/秒并延迟其他请求。

接下来是界面:

public interface IThrottler : IDisposable
{
    Task<TResult> Throttle<TResult>(Func<Task<TResult>> task);
}

用法就像

var audio = await throttler.Throttle(() => api.MyAudio());
var messages = await throttler.Throttle(() => api.ReadMessages());
var audioLyrics = await throttler.Throttle(() => api.AudioLyrics(audioId));
/// Here should be delay because 3 requests executed
var photo = await throttler.Throttle(() => api.MyPhoto());

如何实现节流?

目前我将其实现为由后台线程处理的队列。

public Task<TResult> Throttle<TResult>(Func<Task<TResult>> task)
{
    /// TaskRequest has method Run() to run task
    /// TaskRequest uses TaskCompletionSource to provide new task 
    /// which is resolved when queue processed til this element.
    var request = new TaskRequest<TResult>(task);

    requestQueue.Enqueue(request);

    return request.ResultTask;
}

这是处理队列的后台线程循环的缩短代码:

private void ProcessQueue(object state)
{
    while (true)
    {
        IRequest request;
        while (requestQueue.TryDequeue(out request))
        {
            /// Delay method calculates actual delay value and calls Thread.Sleep()
            Delay();
            request.Run();
        }

    }
}

是否可以在没有后台线程的情况下实现这个?

最佳答案

因此,我们将从一个更简单的问题的解决方案开始,即创建一个最多同时处理 N 个任务的队列,而不是限制为每秒启动 N 个任务,并以此为基础:

public class TaskQueue
{
    private SemaphoreSlim semaphore;
    public TaskQueue()
    {
        semaphore = new SemaphoreSlim(1);
    }
    public TaskQueue(int concurrentRequests)
    {
        semaphore = new SemaphoreSlim(concurrentRequests);
    }

    public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            return await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
    public async Task Enqueue(Func<Task> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
}

我们还将使用以下辅助方法将 TaskCompletionSource 的结果与“任务”相匹配:

public static void Match<T>(this TaskCompletionSource<T> tcs, Task<T> task)
{
    task.ContinueWith(t =>
    {
        switch (t.Status)
        {
            case TaskStatus.Canceled:
                tcs.SetCanceled();
                break;
            case TaskStatus.Faulted:
                tcs.SetException(t.Exception.InnerExceptions);
                break;
            case TaskStatus.RanToCompletion:
                tcs.SetResult(t.Result);
                break;
        }

    });
}

public static void Match<T>(this TaskCompletionSource<T> tcs, Task task)
{
    Match(tcs, task.ContinueWith(t => default(T)));
}

现在对于我们的实际解决方案,我们可以做的是每次我们需要执行节流操作时,我们创建一个 TaskCompletionSource,然后进入我们的 TaskQueue 并添加一个项目启动任务,匹配 TCS 到它的结果,不等待它,然后延迟任务队列 1 秒。然后任务队列将不允许任务开始,直到过去一秒不再有 N 个任务开始,而操作本身的结果与创建 Task 相同:

public class Throttler
{
    private TaskQueue queue;
    public Throttler(int requestsPerSecond)
    {
        queue = new TaskQueue(requestsPerSecond);
    }
    public Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        var unused = queue.Enqueue(() =>
        {
            tcs.Match(taskGenerator());
            return Task.Delay(TimeSpan.FromSeconds(1));
        });
        return tcs.Task;
    }
    public Task Enqueue<T>(Func<Task> taskGenerator)
    {
        TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
        var unused = queue.Enqueue(() =>
        {
            tcs.Match(taskGenerator());
            return Task.Delay(TimeSpan.FromSeconds(1));
        });
        return tcs.Task;
    }
}

关于c# - 支持多线程的异步任务队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34315589/

相关文章:

Python 3 多进程分散器 : BrokenPipeError (Broken pipe)

c# - 为什么 Thread.CurrentPrincipal 需要 "await Task.Yield()"才能正确流动?

c# - 为什么在文本框中更改文本时前景色是黑色而不是绿色?

c# - 试图获得随机排序顺序

java - 嵌套捕获组如何在正则表达式中编号?

c# - 是否可以在未安装 .NET 或 XNA 的情况下运行 XNA 游戏?

android - 从守护线程更新单例

c++ - 多线程同步

c# - 如何对gridview进行排序以按特定列显示行

c# - 更新和扩展枚举 C#