c# - 在 foreach 中进行多个 http 调用,迭代数千条记录

标签 c# foreach dotnet-httpclient .net-4.8

我正在 .NET 中实现一项 Web 作业,该作业必须负责将大约 25,000 个用户从我们的数据库导入到由 IdentityServer 管理的新数据库(而不是由我们处理) .

就 http 调用而言,此操作将非常昂贵,因为对于每个用户,我都必须进行这些调用:

  • 创建用户;
  • 为创建的用户分配任何角色(可能不止一个,因此还会有另一次迭代);
  • 分配(我确定)至少两个声明(另一次迭代);
  • 更改用户密码。

我无法以任何其他方式处理它,因为我导入用户的实例的所有者已将这些定义为每个用户要采取的​​步骤。

我是这么想的: 这是我的“入口”点:

internal async static Task<WorkerResponse> SendUsersAsync(string token, IDictionary<int, UserToSend> usersToSend, ICollection<Roles> roles, IMapper mapper, TextWriter logger = null)
{
    string userName = string.Empty;
    try
    {
        foreach (KeyValuePair<int, UserToSend> userToSend in usersToSend)
        {
            int externalId = userToSend.Key;
            userName = userToSend.Value.UserName;
            int fK_Soggetto = userToSend.Value.FK_Soggetto;
            logger?.Write($"Sending user {userName} with (External)Id {externalId}");
            logger?.WriteLine(string.Empty);

            UserToPost userToPost = mapper.Map<UserToPost>(userToSend.Value);

            (bool isSuccess, string messageOrUserId) = await SendUserToAuthorityAsync(token, userToPost);
            if (!isSuccess)
                return new WorkerResponse(isSuccess: false, message: messageOrUserId);

            logger?.Write($"User {userName} sent.");
            logger?.WriteLine(string.Empty);

            if (userToSend.Value.ConsulenzaRoles?.Count > 0)
            {
                logger?.Write($"Appending roles for user {userName} with id {messageOrUserId}");
                logger?.WriteLine(string.Empty);
                (bool isSuccessRoles, string messageRoles) = await SendUserRolesToAuthorityAsync(
                                                                    token,
                                                                    SendingUserHelper.GetUserRolesToPost(userToSend.Value.ConsulenzaRoles, roles, messageOrUserId),
                                                                    userName,
                                                                    logger);

                if (!isSuccessRoles)
                    return new WorkerResponse(isSuccess: false, message: messageRoles);
            }

            logger?.Write($"Appending claims for user {userName} with id {messageOrUserId}");
            logger?.WriteLine(string.Empty);
            ICollection<UserClaimToPost> userClaimsToPost = SendingUserHelper.GetUserClaimsToPost(messageOrUserId, externalId, fK_Soggetto);
            (bool isSuccessClaims, string msg) = await SendUserClaimsToAuthorityAsync(token, userClaimsToPost, userName, logger);
            if (!isSuccessClaims)
                return new WorkerResponse(isSuccess: false, message: msg);
        }
    }
    catch (BusinessLogicException ex)
    {
        return new WorkerResponse(isSuccess: false, message: $"{ex.Message} {ex.InnerException?.Message}");
    }

    return new WorkerResponse(isSuccess: true, message: $"user {userName} successfully added");
}

每个方法内部的位置(发送用户、发送角色等) 所有方法的结构或多或少都是这样的(主要使用 (HttpClient httpClient = new HttpClient())):

private async static Task<(bool, string)> SendUserToAuthorityAsync(string token, UserToPost userToPost, TextWriter logger = null)
{
    try
    {
        logger?.WriteLine($"Attempting to request auth to send User {userToPost.UserName}...");
        logger?.WriteLine(string.Empty);

        IdentityServerUser userResponse;
        using (HttpClient httpClient = new HttpClient())
        {
            httpClient.BaseAddress = new Uri(AdminAuthority);
            httpClient.DefaultRequestHeaders.Accept.Clear();
            httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
            httpClient.DefaultRequestHeaders.Add("Authorization", $"Bearer {token}");

            string bodyString = JsonConvert.SerializeObject(userToPost);
            byte[] buffer = Encoding.UTF8.GetBytes(bodyString);
            ByteArrayContent byteContent = new ByteArrayContent(buffer);
            byteContent.Headers.ContentType = new MediaTypeHeaderValue("application/json");

            using (HttpResponseMessage responseMessage = await httpClient.PostAsync(IdentityServerEndpoint.PostUsers, byteContent))
            {
                responseMessage.EnsureSuccessStatusCode();

                userResponse = JsonConvert.DeserializeObject<IdentityServerUser>(await responseMessage.Content.ReadAsStringAsync());
                if (userResponse?.IsOk == false)
                    return (false, $"Error deserializing user {userToPost.UserName} from content string to abstracted model");
            }
        }

        if (userResponse?.Id == default)
            return (false, $"Error deserializing user {userToPost.UserName} from content string to abstracted model");

        return (true, $"{userResponse.Id}");
    }
    catch (Exception ex)
    {
        return (false, $"Error sending user '{userToPost.UserName}'.\n{ex.Message}{ex.InnerException?.Message}");
    }
}

我想知道是否有更智能的方法可以在 foreach 中进行此调用。 例如,我不确定 HttpClientusing 是否安全,也不确定是否最好考虑一个“惰性”系统,将用户发送到时间,而无需立即调用数千个电话。谢谢

最佳答案

我有几个建议。

对整个作业使用单个 HttpClient 实例。 HttpClient 设计为由多个线程使用,创建太多实例(即使您正确处置它们)会导致问题。

确保用户列表保留在某处,这样当您尝试导入用户时就不会重新查询整个用户列表。将列表分成几组,以便您可以在影响较小时发现常见问题。例如,导入 1 个用户,并让他们确保拥有所需的权限。然后导入 10 个用户并目视验证他们似乎拥有所需的权限。然后导入 100 个用户并确保您没有看到任何意外错误。然后是 1000。然后是剩余的用户。

仔细考虑如果只有一个用户未能发送邮件,您真正希望发生什么情况。您真的想阻止所有其他用户被发送吗?一种常见的方法是将失败用户的信息保存到“死信”文件中,您可以在其中分析它们,并在修复根本问题后轻松地为这些用户重新运行逻辑。

仔细考虑如果给定用户只有部分流程成功,会发生什么情况。理想情况下,您会将流程编写为幂等的,这样您就可以再次运行整个流程,并且已经发生的步骤不会导致问题。例如,请确保仅当源系统中尚不存在该用户时才创建该用户,并确保如果该用户已具有这些角色,则为该用户分配角色不会产生任何影响。

在担心并发性之前,请考虑一下它会产生多大的影响。如果您设法在 30 分钟内导入 1000 个用户而不进行优化,那么您可能能够在大约 12 小时内完成剩余的 24,000 个用户。如果这是一项一次性工作,那么让事情保持简单并让该工作运行 12 小时可能是值得的,而不是花几个小时的开发时间尝试优化它并冒着引入更多错误的风险。

如果您确实认为增加复杂性值得,那么relatively simple将异步任务放入受限制的并发管道中。您的任务本质上是异步的,因此不必担心多线程。当您在数百个用户的 block 上测试流程时,您可能会发现节流大小的 yield 递减点。例如,您可能会发现一次导入 10 个用户可以使性能提升 8 倍,而一次导入 20 个用户只能最低限度地提高速度,而一次导入 50 个用户会使服务器开始拒绝您的请求。不要贪婪:将 throttle 设置在最佳位置,这样您就可以通过相对较低的并发性获得巨大的性能提升。

您在发送 JSON 字符串之前将其转换为字节数组的操作似乎是多余的。为什么不直接使用 StringContent 来代替呢?

关于c# - 在 foreach 中进行多个 http 调用,迭代数千条记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73988531/

相关文章:

c# - LINQ 使用 ROW_NUMBER() 函数?

c# - 检查是否已附加特定的事件处理程序方法

c# - 是否必须在请求之间处理 HttpClient 和 HttpClientHandler?

c# - MVC 3 多表单模型传递给字典

c# - 是否可以使用 WCF 架构每天处理 1B 请求?

java - 迭代列表时删除元素

javascript - 将数组变量传递给函数的下一部分

php - 使用 while/foreach 构建复杂的表

c# - HttpClient 请求限制器和缓冲区的实现

c# - 从 MVP Winform 客户端使用 WebAPI 的设计模式