c# - 限制没有。每秒由 Dns.BeginGetHostEntry 方法生成的请求数或使用任务并行库 (TPL)

标签 c# dns async-await task-parallel-library fqdn

我已使用 Dns.BeginGetHostEntry 方法根据主机名获取主机的 FQDN(主机名列表存储在 SQL 服务器数据库中)。 此方法(异步)在不到 30 分钟的时间内完成了近 150k 条记录的运行,并在存储主机名的 SQL 的同一表中更新了 FQDN。

此解决方案运行速度过快(超过了每秒 300 个请求的阈值)。由于允许没有。服务器生成的请求的数量是有限的,我的服务器被列在 top talker 中并请求停止该应用程序的运行。我必须重建此应用程序才能同步运行,现在需要 6 个多小时才能完成。

//// TotalRecords are fetched from SQL database with the Hostname (referred as host further)
for (int i = 0; i < TotalRecords.Rows.Count; i++)
{
    try
    {
        host = TotalRecords.Rows[i].ItemArray[0].ToString();
        Interlocked.Increment(ref requestCounter);
        string[] arr = new string[] { i.ToString(), host }; 
        Dns.BeginGetHostEntry(host, GetHostEntryCallback,arr);
    }
    catch (Exception ex)
    {
        log.Error("Unknown error occurred\n ", ex);
    }
}
do
{
    Thread.Sleep(0);

} while (requestCounter>0);

ListAdapter.Update(TotalRecords);

问题:

  1. 有什么方法可以限制每秒生成的请求数吗?

  2. 我了解到 ParallelOptions.MaxDegreeOfParallelism 不控制每秒的线程数,那么 TPL 有什么更好的选择吗?这个能不能限制到没有。每秒请求数?

最佳答案

纯异步解决方案。

它使用一个 nuget 包 Nite.AsyncExSystem.Reactive它执行错误处理并提供 DNS 结果作为 IObservable<IPHostEntry> 出现。

这里发生了很多事情。您需要了解 reactive extensions作为标准async programming .可能有很多方法可以实现以下结果,但这是一个有趣的解决方案。

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Linq;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using Nito.AsyncEx;
using System.Threading;

#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed

public static class EnumerableExtensions
{
    public static IEnumerable<Func<U>> Defer<T, U>
        ( this IEnumerable<T> source, Func<T, U> selector) 
        => source.Select(s => (Func<U>)(() => selector(s)));
}


public class Program
{
    /// <summary>
    /// Returns the time to wait before processing another item
    /// if the rate limit is to be maintained
    /// </summary>
    /// <param name="desiredRateLimit"></param>
    /// <param name="currentItemCount"></param>
    /// <param name="elapsedTotalSeconds"></param>
    /// <returns></returns>
    private static double Delay(double desiredRateLimit, int currentItemCount, double elapsedTotalSeconds)
    {
        var time = elapsedTotalSeconds;
        var timeout = currentItemCount / desiredRateLimit;
        return timeout - time;
    }

    /// <summary>
    /// Consume the tasks in parallel but with a rate limit. The results
    /// are returned as an observable.
    /// </summary>
    /// <typeparam name="T"></typeparam>
    /// <param name="tasks"></param>
    /// <param name="rateLimit"></param>
    /// <returns></returns>
    public static IObservable<T> RateLimit<T>(IEnumerable<Func<Task<T>>> tasks, double rateLimit){
        var s = System.Diagnostics.Stopwatch.StartNew();
        var n = 0;
        var sem = new  AsyncCountdownEvent(1);

        var errors = new ConcurrentBag<Exception>();

        return Observable.Create<T>
            ( observer =>
            {

                var ctx = new CancellationTokenSource();
                Task.Run
                    ( async () =>
                    {
                        foreach (var taskFn in tasks)
                        {
                            n++;
                            ctx.Token.ThrowIfCancellationRequested();

                            var elapsedTotalSeconds = s.Elapsed.TotalSeconds;
                            var delay = Delay( rateLimit, n, elapsedTotalSeconds );
                            if (delay > 0)
                                await Task.Delay( TimeSpan.FromSeconds( delay ), ctx.Token );

                            sem.AddCount( 1 );
                            Task.Run
                                ( async () =>
                                {
                                    try
                                    {
                                        observer.OnNext( await taskFn() );
                                    }
                                    catch (Exception e)
                                    {
                                        errors.Add( e );
                                    }
                                    finally
                                    {
                                        sem.Signal();
                                    }
                                }
                                , ctx.Token );
                        }
                        sem.Signal();
                        await sem.WaitAsync( ctx.Token );
                        if(errors.Count>0)
                            observer.OnError(new AggregateException(errors));
                        else
                            observer.OnCompleted();
                    }
                      , ctx.Token );

                return Disposable.Create( () => ctx.Cancel() );
            } );
    }

    #region hosts



    public static string [] Hosts = new [] { "google.com" }

    #endregion


    public static void Main()
    {
        var s = System.Diagnostics.Stopwatch.StartNew();

        var rate = 25;

        var n = Hosts.Length;

        var expectedTime = n/rate;

        IEnumerable<Func<Task<IPHostEntry>>> dnsTaskFactories = Hosts.Defer( async host =>
        {
            try
            {
                return await Dns.GetHostEntryAsync( host );
            }
            catch (Exception e)
            {
                throw new Exception($"Can't resolve {host}", e);
            }
        } );

        IObservable<IPHostEntry> results = RateLimit( dnsTaskFactories, rate );

        results
            .Subscribe( result =>
            {
                Console.WriteLine( "result " + DateTime.Now + " " + result.AddressList[0].ToString() );
            },
            onCompleted: () =>
            {
                Console.WriteLine( "Completed" );

                PrintTimes( s, expectedTime );
            },
            onError: e =>
            {
                Console.WriteLine( "Errored" );

                PrintTimes( s, expectedTime );

                if (e is AggregateException ae)
                {
                    Console.WriteLine( e.Message );
                    foreach (var innerE in ae.InnerExceptions)
                    {
                        Console.WriteLine( $"     " + innerE.GetType().Name + " " + innerE.Message );
                    }
                }
                else
                {
                        Console.WriteLine( $"got error " + e.Message );
                }
            }

            );

        Console.WriteLine("Press enter to exit");
        Console.ReadLine();
    }

    private static void PrintTimes(Stopwatch s, int expectedTime)
    {
        Console.WriteLine( "Done" );
        Console.WriteLine( "Elapsed Seconds " + s.Elapsed.TotalSeconds );
        Console.WriteLine( "Expected Elapsed Seconds " + expectedTime );
    }
}

最后几行输出是

result 5/23/2017 3:23:36 PM 84.16.241.74
result 5/23/2017 3:23:36 PM 84.16.241.74
result 5/23/2017 3:23:36 PM 157.7.105.52
result 5/23/2017 3:23:36 PM 223.223.182.225
result 5/23/2017 3:23:36 PM 64.34.93.5
result 5/23/2017 3:23:36 PM 212.83.211.103
result 5/23/2017 3:23:36 PM 205.185.216.10
result 5/23/2017 3:23:36 PM 198.232.125.32
result 5/23/2017 3:23:36 PM 66.231.176.100
result 5/23/2017 3:23:36 PM 54.239.34.12
result 5/23/2017 3:23:36 PM 54.239.34.12
result 5/23/2017 3:23:37 PM 219.84.203.116
Errored
Done
Elapsed Seconds 19.9990118
Expected Elapsed Seconds 19
One or more errors occurred.
     Exception Can't resolve adv758968.ru
     Exception Can't resolve fr.a3dfp.net
     Exception Can't resolve ads.adwitserver.com
     Exception Can't resolve www.adtrader.com
     Exception Can't resolve trak-analytics.blic.rs
     Exception Can't resolve ads.buzzcity.net

我无法粘贴完整代码,所以这里是包含主机列表的代码链接。

https://gist.github.com/bradphelan/084e4b1ce2604bbdf858d948699cc190

关于c# - 限制没有。每秒由 Dns.BeginGetHostEntry 方法生成的请求数或使用任务并行库 (TPL),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44107311/

相关文章:

redirect - DNS 映射到子文件夹

c# - 我应该如何在异步/等待操作中使用静态方法/类?

c# - 捕获由 async void 方法抛出的异常

node.js - 在 Node js 中正确使用 async wait

c# - 如何强制 WebAPI OData 扩展子句?

android - 通过主机名连接

c# - ASP.Net core 中全大写属性名称默认驼峰命名法序列化为 JSON 的问题

c++ - 如何在不使用 C++ 管理员权限的情况下设置 AD 属性值?

c# - 多个 Asp.net Web 应用程序的授权

c# - 界面上的扩展方法未显示