插入 SQL 数据库时 C# 嵌套 Parallel.ForEach

标签 c# sql-server parallel-processing ado.net parallel.foreach

我们有一个对象(XML 或 JSON)并且我们成功地将它映射到一个 DTO,它需要很长时间(5~7 分钟)才能插入到我们的数据库中,所以我们通过 Parallel.ForEach,但最终,我们注意到有一些数据输入不正确,比如 Category 包含所有同名的项目,但其他不同的属性是 100% 正确的,在其他情况下,我们注意到所有数据在一个类别中都是相同的,尽管提供的 JSON 对象没有那个。

我承认它是如此之快,不到一分钟,但插入错误,请看下面使用的代码:

JSON

[
  {
    "CategoryId": 1,
    "CategoryName": "Drinks",
    "SortOrder": 1,
    "Products": [
      {
        "ProductId": 100,
        "ProductName": "Black Tea",
        "SortOrder": 1,
        "Price": 5,
        "Choices": []
      },
      {
        "ProductId": 101,
        "ProductName": "Turkish Coffee",
        "SortOrder": 2,
        "Price": 7.5,
        "Choices": []
      },
      {
        "ProductId": 102,
        "ProductName": "Green Tea",
        "SortOrder": 3,
        "Price": 6,
        "Choices": []
      },
      {
        "ProductId": 103,
        "ProductName": "Café Latte Medium",
        "SortOrder": 4,
        "Price": 10,
        "Choices": []
      },
      {
        "ProductId": 104,
        "ProductName": "Orange Juice",
        "SortOrder": 5,
        "Price": 11,
        "Choices": []
      },
      {
        "ProductId": 105,
        "ProductName": "Mixed Berry Juice",
        "SortOrder": 6,
        "Price": 12.5,
        "Choices": []
      }
    ]
  },
  {
    "CategoryId": 1,
    "CategoryName": "Meals",
    "SortOrder": 1,
    "Products": [
      {
        "ProductId": 200,
        "ProductName": "Breakfast Meal",
        "SortOrder": 1,
        "Price": 16,
        "Choices": [
          {
            "ChoiceId": 3000,
            "ChoiceName": "Strawberry Jam",
            "SortOrder": 1,
            "Price": 0
          },
          {
            "ChoiceId": 3001,
            "ChoiceName": "Apricot Jam",
            "SortOrder": 2,
            "Price": 0
          },
          {
            "ChoiceId": 3002,
            "ChoiceName": "Orange Jam",
            "SortOrder": 3,
            "Price": 0
          },
          {
            "ChoiceId": 3003,
            "ChoiceName": "Café Latte",
            "SortOrder": 4,
            "Price": 2
          }
        ]
      },
      {
        "ProductId": 201,
        "ProductName": "Mixed Grill",
        "SortOrder": 1,
        "Price": 30,
        "Choices": [
          {
            "ChoiceId": 3004,
            "ChoiceName": "Moutabal",
            "SortOrder": 1,
            "Price": 0
          },
          {
            "ChoiceId": 3005,
            "ChoiceName": "Mineral Water",
            "SortOrder": 2,
            "Price": 0
          },
          {
            "ChoiceId": 3006,
            "ChoiceName": "French Fries",
            "SortOrder": 2,
            "Price": 0
          },
          {
            "ChoiceId": 3007,
            "ChoiceName": "Grilled Potatoes",
            "SortOrder": 2,
            "Price": 0
          }
        ]
      }
    ]
  }
]

C#代码

Parallel.ForEach(categories, (category) =>
{
    var newCreatedCategoryId = 0;
    using (var connection = new SqlConnection("CONNECTION_STRING_HERE"))
    {
        connection.Open();
        using (var command = new SqlCommand("SP_INSERT_INTO_CATEGORIES", connection))
        {
            command.CommandType = CommandType.StoredProcedure;
            command.Parameters.AddWithValue("@P1", category.CategoryName);
            command.Parameters.AddWithValue("@P2", category.SortOrder);
            newCreatedCategoryId = int.Parse(command.ExecuteScalar().ToString());
            command.Dispose();
        }

        connection.Close();
    }

    if (newCreatedCategoryId > 0)
    {
        Parallel.ForEach(category.Products, (product) =>
        {
            using (var connection = new SqlConnection("CONNECTION_STRING_HERE"))
            {
                connection.Open();
                using (var command = new SqlCommand("SP_INSERT_INTO_PRODUCTS", connection))
                {
                    command.CommandType = CommandType.StoredProcedure;
                    command.Parameters.AddWithValue("@P1", product.ProductName);
                    command.Parameters.AddWithValue("@P2", product.Price);
                    command.Parameters.AddWithValue("@P3", product.SortOrder);
                    command.Parameters.AddWithValue("@P4", newCreatedCategoryId);
                    command.ExecuteNonQuery();
                    command.Dispose();
                }

                connection.Close();
            }
        });
    }
});

我看了here ,但这不是我们的问题,我们已经在使用 SCOPE_IDENTITY() 来获取当前执行范围内最后生成的身份。

另一方面,即使没有 TableLock,也不允许使用 SqlBulkCopy 插入此数量的数据。

最佳答案

问题出在 newCreatedCategoryId 上,让我困惑的是你为什么调用 newCreatedCategoryId = int.Parse(command.ExecuteScalar().ToString()); 再次在内循环中。我的意思是,如果它只是类别的 id,则不需要再次递增。

看看下面的编辑。您最好将第二个 Parallel.ForEach 放入标准 foreach 我的意思是无论如何这都是并行工作的。最后,Parallel.ForEach 并不真正适合 IO 任务,正确的模式是异步和等待。说您可能会使用 TPL 数据流中的 ActionBlock 来充分利用两全其美的优势。看看我回答的这个问题中的数据流示例 Downloading 1,000+ files fast?

Parallel.ForEach(categories, (category) =>
{
    var newCreatedCategoryId = 0;
    using (var connection = new SqlConnection("CONNECTION_STRING_HERE"))
    {
        connection.Open();
        using (var command = new SqlCommand("SP_INSERT_INTO_CATEGORIES", connection))
        {
            command.CommandType = CommandType.StoredProcedure;
            command.Parameters.AddWithValue("@P1", category.CategoryName);
            command.Parameters.AddWithValue("@P2", category.SortOrder);
            newCreatedCategoryId = int.Parse(command.ExecuteScalar().ToString());
            command.Dispose();
        }

        connection.Close();
    }

    if (newCreatedCategoryId > 0)
    {
        foreach(product in category.Products)
        {
            using (var connection = new SqlConnection("CONNECTION_STRING_HERE"))
            {
                connection.Open();
                using (var command = new SqlCommand("SP_INSERT_INTO_PRODUCTS", connection))
                {
                    command.CommandType = CommandType.StoredProcedure;
                    command.Parameters.AddWithValue("@P1", product.ProductName);
                    command.Parameters.AddWithValue("@P2", product.Price);
                    command.Parameters.AddWithValue("@P3", product.SortOrder);
                    command.Parameters.AddWithValue("@P4", newCreatedCategoryId);
                    command.Dispose();
                }

                connection.Close();
            }
        }//);
    }
});

关于插入 SQL 数据库时 C# 嵌套 Parallel.ForEach,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51057355/

相关文章:

C# 将字符串解析为 Func<T, bool>

sql - 两个不同查询 (SQL) 的结果求和

python - 将 xargs 用于并行 Python 脚本

c++ - 分离线程会导致写入撕裂吗

java - 使用 Java (netbeans) 连接到远程 SQL 服务器时出现 SSL 错误

c# - .NET 任务并行库

c# - 从字符串中获取 url

c# - WM_PAINT 在 Parallel.For 尚未完成时如何处理?

c# - 取消异步httpwebrequest

sql - TSQL BINARY_CHECKSUM 作为默认值