c# - 修复 C# 异步方法顺序执行

标签 c# postgresql lambda async-await

我遇到的问题是对我正在执行的异步方法的调用是按顺序发生的。我正在将调用的任务添加到 ConcurrentBag 并等待包中的任务。我不关心这些调用的结果,我只需要确认它们已完成。但是,这些调用完全按顺序发生,这非常令人困惑。所讨论的方法通过带有参数化查询的 Npgsql 执行一些 PostgreSQL 查询。调用者获取我们自己的数据树并拉出树中的所有节点并遍历节点并在它们上执行此任务。我还使用自定义 AsyncHelper 类,它将迭代 IEnumerable 实现程序中的任务并等待其中的任务。我的 Tree 实现和 AsyncHelper 都在另一段代码中进行了测试,该代码执行与此代码相同的基本原理,它按预期异步执行任务。

我在函数调用上添加了日志记录,以确认这些调用是按顺序发生的。我事件还从包中取出方法并运行该方法,它仍然做同样的事情,它按顺序发生并且在完成之前不会继续我的循环。我所有的方法都标记为异步,直到循环结束后我才等待它们。

//method executing sequentially
public static async Task<List<ContactStatistic>> getContactStats(Guid tenantId, DateTime start, DateTime end, Breakdown breakdown) {
    if (!await Postgres.warmConnection(5)) { return null; }
    var hierarchy = await getTreeForTenant<TenantContactStatsNode>(tenantId);

    //perform calculations to determine stats for each element
    var calculationTasks = new ConcurrentBag<Task>();
    var allData = await hierarchy.getAllData();
    var timestampGotAllData = DateTime.Now;

    foreach (var d in allData) {
        calculationTasks.Add(d.getContactStats(start, end, breakdown));
    }

    Console.WriteLine("about to await all the tasks");
    //await the tasks to complete for calculations
    await AsyncHelper.waitAll(calculationTasks);
}


//method it's calling
public async Task getContactStats(DateTime start, DateTime end, Breakdown breakdown) {
    //perform two async postgres calls
    //await postgres calls
    //validate PG response
    //perform manipluation on this object with data from the queries
}

我希望第一个调用调用第二个函数,将任务添加到包中,并在完成后等待它们。实际发生的是该方法正在运行、完成,然后添加到包中。

* 编辑 *

下面是请求的第二次调用的完整代码。它根据时间从数据库中获取一些数据,填补拉回时间之间的空白,因此我们有一个完全顺序的返回列表,包括数据库中没有数据的所有时间,并将其放入对象级变量中

public async Task getContactStats(DateTime start, DateTime end, Breakdown breakdown) {
    if (breakdown == Breakdown.Month) {
        //max out month start day to include all for the initial month in the initial count
        start = new DateTime(start.Year, start.Month, DateTime.DaysInMonth(start.Year, start.Month));
    } else {
        //day breakdown previous stats should start the day before given start day
        start = start.AddDays(-1);
    }

    var tran = new PgTran();
    var breakdownQuery = breakdown == Breakdown.Day ? Queries.GET_CONTACT_DAY_BREAKDOWN : Queries.GET_CONTACT_MONTH_BREAKDOWN;
    tran.setQueries(Queries.GET_CONTACT_COUNT_BEFORE_DATE, breakdownQuery);
    tran.setParams(new NpgsqlParameter("@tid", tenantId), new NpgsqlParameter("@start", start), new NpgsqlParameter("@end", end));
    var tranResults = await Postgres.getAll<ContactDayStatistic>(tran);
    //ensure transaction returns two query results
    if (tranResults == null || tranResults.Count != 2) { return; }


    //ensure valid past count was retrieved
    var prevCountResult = tranResults[0];
    if (prevCountResult == null || prevCountResult.Count != 1) { return; }
    var prevStat = new ContactDayStatistic(start.Day, start.Month, start.Year, prevCountResult[0].count);
    //ensure valid contact stat breakdown was retrieved
    var statBreakdown = tranResults[1];
    if (statBreakdown == null) { return;}

    var datesInBreakdown = new List<DateTime?>();
    //get all dates in the returned stats
    foreach (var e in statBreakdown) {
        var eventDate = new DateTime(e.year, e.month, e.day);
        if (datesInBreakdown.Find(item => item == eventDate) == null)
            datesInBreakdown.Add(eventDate);
    }
    //sort so they are sequential
    datesInBreakdown.Sort();

    //initialize timeline starting with initial breakdown
    var fullTimeline = new List<ContactStatistic>();
    //convert initial stat to the right type for final display
    fullTimeline.Add(breakdown == Breakdown.Month ? new ContactStatistic(prevStat) : prevStat);
    foreach (var d in datesInBreakdown) {
        //null date is useless, won't occur, nullable date just for default value of null
        if (d == null) { continue; }
        var newDate = d.Value;
        //fill gaps between last date given and this date
        ContactStatistic.fillGaps(breakdown, newDate, prevStat.getDate(), prevStat.count, ref fullTimeline, false);
        //get stat for this day
        var stat = statBreakdown.Find(item => d == new DateTime(item.year, item.month, item.day));
        if (stat == null) { continue; }
        //add last total for a rolling total of count
        stat.count += prevStat.count;
        fullTimeline.Add(breakdown == Breakdown.Month ? new ContactStatistic(stat) : stat);
        prevStat = stat;
    }
    //fill gaps between last date and end
    ContactStatistic.fillGaps(breakdown, end, prevStat.getDate(), prevStat.count, ref fullTimeline, true);
    //cast list to appropriate return type
    contactStats.Clear();
    contactStats = fullTimeline;
}

* 编辑 2 * 下面是 AsyncHelper 用来等待这些任务的代码。此函数非常适用于使用同一框架的其他代码,它基本上只是清理必须等待枚举任务的代码。

public static async Task waitAll(IEnumerable<Task> coll) {
    foreach (var taskToWait in coll) {
        await taskToWait;
    }  
}

* 编辑 3 * 根据建议,我将 waitAll() 更改为使用 Task.WhenAll() 而不是 foreach 循环,但是问题仍然存在。

public static async Task waitAll(IEnumerable<Task> coll) {
    await Task.WhenAll(coll);
}

* 编辑 4 * 为确保这不是 Postgres 调用造成的,我将第二种方法更改为仅执行打印行,然后休眠 200 毫秒以保持执行路径清晰。我仍然注意到这是完全按顺序发生的(甚至导致我对该函数的 POST 超时,因为实际的实际调用需要将近 20 毫秒)。下面是用于演示该更改的代码

public async Task getContactStats(DateTime start, DateTime end, Breakdown breakdown) {
    Console.WriteLine("CALLED!");
    Thread.Sleep(200);
}

* 编辑 5 * 根据建议,我尝试了一个并行的 foreach 来尝试填充任务的 ConcurrentBag 而不是普通的 foreach。我在这里遇到一个问题,即并行 foreach 在第一次添加完成后完成,并且不会立即添加所有任务。

var calculationTasks = new ConcurrentBag<Task>();
var allData = await hierarchy.getAllData();
var timestampGotAllData = DateTime.Now;
Parallel.ForEach(allData, item => {
    Console.WriteLine("trying parallel foreach");
    calculationTasks.Add(item.getContactStats(start, end, breakdown));
});

Console.WriteLine("about to await all the tasks");
//await the tasks to complete for calculations
await AsyncHelper.waitAll(calculationTasks);

* 编辑 6 * 为了视觉效果,我运行了代码并做了一些输出以显示发生的怪异情况。执行代码如下:

foreach (var d in allData) {
    Console.WriteLine("Adding call to bag");
    calculationTasks.Add(d.getContactStats(start, end, breakdown));
    Console.WriteLine("Done adding call to bag");
}

输出是:https://i.imgur.com/3y5S4eS.png

因为它每次都打印“CALLED”,所以“Done!”在“完成添加对包的调用”之前,这些执行是按顺序发生的,而不是预期的异步。

最佳答案

我的直觉是,这与您在您的方法中打开的交易有关。很难确切地说出您的代码中发生了什么,因为这里似乎有一些自定义类 - 但是在您打开交易时是否可能正在进行一些锁定?由于这发生在您第一次等待之前,因此它必须在等待代码之前“按顺序”运行。

您的自定义“waitall”方法似乎不是问题所在,但您应该考虑删除它并使用内置的 Task.WhenAll 异步等待这些方法。

关于c# - 修复 C# 异步方法顺序执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55524761/

相关文章:

c# - 从 .NET Core 应用程序引用 .NET 4.6.2 类库

c# - 以字符串作为索引和函数指针或委托(delegate)作为值的 SortedList?和通过字符串调用函数?

c# - 如何在 ASP.NET MVC Controller 中使用可选参数

haskell - 匿名函数中的参数

c# - 如何在不使用 .net 中的属性的情况下在 dynamodb 中映射类属性

python - 使用 PyQt 连接到 Sqlalchemy 数据库并将数据库内容保存到 PyQt 中的 TableView

Python( Django ) pip 在 MacOS 中安装 psycopg2 错误

sql - 在连接表中的 SQL 中为每个 id 返回一个结果

java - 使用 Lambda 表达式在 Selenium 中使用 Java 切换窗口

c++ - GCC 无法使用 init-capture 捕获 'this' 指向模板类型的指针