azure - 从 Azure Databricks 将数据保存到 Cosmos DB 的速度非常慢

标签 azure pyspark apache-spark-sql azure-cosmosdb azure-databricks

我是 Azure 和 Databricks 的初学者,我在使用 Spark 连接器将大量数据(例如 5 GB)从 Azure Databricks 保存到 Cosmos DB 时遇到困难。

前言: 我面临一个问题,因为我想用大量行(例如 5 亿+)填充 Cosmos DB 容器以进行进一步测试。目前,我有一个容器(例如容器#1),我在其中使用流分析传输遥测数据。我想要实现的是从容器 #1 中获取数据,将数据相乘多次,然后将其保存到新的容器 #2 中。我想使用 Spark 连接器和 Databricks 来解决这个问题,但是保存速度非常慢,例如。每 40 分钟 100 万行,当我想向其中保存 GB 的行时,这是 Not Acceptable 。另一方面,从容器中读取数据的速度非常快,例如。 130 万行需要 10 秒。我禁用了容器 #2 上的索引,因为我读到这应该会带来一些好处,但实际上并没有产生非常明显的差异。 容器 #2 设置为 2000 RU,容器 #1 设置为 1000 RU,两个吞吐量都设置为自动缩放。

不幸的是,我不知道我做错了什么,因此我们将不胜感激任何帮助,甚至是解决此问题的其他可能方法的建议(例如 Synapse Analytics 等)。

附注 我在某处读到,我应该首先尝试保存到 ADLS 2 存储,然后从中读取数据帧,然后读取到 Cosmos DB。它确实提供了改进,但仍然没有接近可接受的速度。

我提前为我的英语蹩脚道歉,而不是我的母语。

代码

配置

connectionConfigRead = {
  "spark.cosmos.accountEndpoint" : Endpoint,
  "spark.cosmos.accountKey" : Masterkey,
  "spark.cosmos.database" : Database,
  "spark.cosmos.container": Container,
  "spark.cosmos.read.inferSchema.enabled" : "false",
  "spark.cosmos.changeFeed.startFrom" : "Now"
}

connectionConfigWrite = {
  "spark.cosmos.accountEndpoint" : Endpoint,
  "spark.cosmos.accountKey" : Masterkey,
  "spark.cosmos.database" : Database,
  "spark.cosmos.container": Container2,
  "spark.cosmos.changeFeed.startFrom" : "Now"
}

从容器中读取

customSchema = StructType([
          StructField("iothub-connection-module-id", StringType()),
          StructField("value_type", StringType()),
          StructField("timestamp", DoubleType()),
          StructField("data", StringType()),
          StructField("IoTHub", StringType()),
          StructField("id", StringType()),
          StructField("EventEnqueuedUtcTime", StringType()),
          StructField("sensorId", StringType()),
          StructField("EventProcessedUtcTime", StringType()),
          StructField("PartitionId", IntegerType()),
          StructField("value", DoubleType())
    ])

readDF = (spark.read.schema(customSchema).format("cosmos.oltp").options(**connectionConfigRead).load())

写入容器

readDF.write.mode("append").format("cosmos.oltp").options(**connectionConfigWrite).save()

最佳答案

2 号容器没有足够的吞吐量。假设每次插入的速度为 10 RU/s,则 2000 RU/s 每秒最多可以执行 200 次插入。即 12k/分钟或 720k/小时。由于您只处理 5 GB 数据和数百万行,因此我会将自动缩放最大吞吐量扩展到 10000 RU/s,即 1000/秒或 3.6M/小时。获取有关 Spark 连接器的更多信息,Spark OLTP connector resources 。另外,请确保 Databricks 和 Cosmos DB 在同一区域中运行。这可能是导致延迟增加的原因。

如果您要查询容器#2,您需要对其建立索引。找出您用于过滤谓词、排序依据等的属性,并在需要时创建必要的范围和复合索引。

关于azure - 从 Azure Databricks 将数据保存到 Cosmos DB 的速度非常慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73853146/

相关文章:

python - PySpark 1.5 如何将时间戳从秒截断到最近的分钟

apache-spark - Spark 动态 DAG 比硬编码的 DAG 慢很多

python - 表之间的 PySpark 正则表达式匹配

azure - Azure-AD B2C 可以使用手机号码作为用户名吗?

python - 使用 PySpark 将 JSON 文件读取为 Pyspark Dataframe?

pyspark - py spark 成功写入无效日期但在读取时抛出异常

python - Spark(Python)中的 Kolmogorov Smirnov 测试不起作用?

azure - 使用静态 token 连接到动态

linux - 无法访问 Azure 云上 Linux VM 上托管的应用程序

asp.net-mvc - 在MVC应用程序中启用CORS(将部署在azure上)