apache-spark - 从 google pubsub 到 spark streaming 的数据摄取很慢

标签 apache-spark google-bigquery spark-streaming google-cloud-pubsub google-cloud-dataproc

我正在使用谷歌云 Dataproc Spark 集群来运行 Spark 流作业,它从多个 PubSub 订阅中读取数据并写入 BigQuery。 PubSub 有 500 万个元素,滑动窗口为 2 分钟,批处理/窗口为 30 秒,我每批处理只得到大约 200,000 个元素。我希望第一批全部拿到500万。每个元素的大小约为 140 字节,采用 Avro 消息格式。

我在 Dataflow 中实现了每秒 100 万个元素的速度,但我想在 Dataproc 中实现同样的速度。我尝试使用 Dataproc 的自动缩放选项,还尝试使用在 Dataflow 上工作的相同 Beam 管道代码。如果我增加订阅数量,那么它可能会给我更多的吞吐量。是否有可能从单个订阅中获得 1M 元素/秒的吞吐量?

以下是我的 Scala 代码:

// Reading from multiple PubSub.
for (a <- 0 to Integer.parseInt(subs)) {
  logger.info("SKCHECK : Creating stream : " + subscription + a)
  val everysub  = PubsubUtils.createStream(
      ssc, projectId, None, subscription + a,
      SparkGCPCredentials.builder.jsonServiceAccount(jsonPath).build(),
      StorageLevel.MEMORY_ONLY_SER).map(message => {
          // Method to send avro bytes message and get row
          val row : Row = avroMsgToRow(message.getData())
          row
      })
}

我的 build.sbt 看起来像:

    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-core" % sparkVersion,
      "org.apache.spark" %% "spark-sql" % sparkVersion,
     // "org.apache.spark" %% "spark-mllib" % sparkVersion,
      "org.apache.spark" %% "spark-streaming" % sparkVersion,
     // "org.apache.spark" %% "spark-hive" % sparkVersion,
      "com.google.cloud" % "google-cloud-bigquery" % bigQueryVersion,
      "com.google.apis" % "google-api-services-bigquery" % googleApiBigQueryVersion,
      "com.google.cloud" % "google-cloud-nio" % gcsNioVersion,
      "com.sksamuel.avro4s" %% "avro4s-core" % avro4sVersion
    )

    // https://mvnrepository.com/artifact/com.google.cloud.bigdataoss/bigquery-connector
    libraryDependencies += "com.google.cloud.bigdataoss" % "bigquery-connector" % "0.10.0-hadoop2"

    // https://mvnrepository.com/artifact/com.spotify/spark-bigquery
    libraryDependencies += "com.spotify" %% "spark-bigquery" % "0.2.2"

    libraryDependencies += "com.google.apis" % "google-api-services-pubsub" % "v1-rev425-1.25.0"

    // https://mvnrepository.com/artifact/org.apache.bahir/spark-streaming-pubsub
    libraryDependencies += "org.apache.bahir" %% "spark-streaming-pubsub" % "2.3.0"


    // https://mvnrepository.com/artifact/org.scala-lang/scala-library
    libraryDependencies += "org.scala-lang" % "scala-library" % "2.10.0-M3"

    // https://mvnrepository.com/artifact/org.apache.spark/spark-avro
    libraryDependencies += "org.apache.spark" %% "spark-avro" % "2.4.0"

如果您需要更多信息,请告诉我。

我希望通过单个 PubSub 订阅获得每秒 100 万个元素的数据摄取速度。

最佳答案

我认为您需要首先确定您的 Spark Streaming 作业的瓶颈。是 CPU 限制、内存限制、IO 限制还是因为 Spark 的某些参数导致它没有充分利用资源?我建议您首先检查资源利用率,然后尝试不同的 machine types .

关于apache-spark - 从 google pubsub 到 spark streaming 的数据摄取很慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57145527/

相关文章:

apache-spark - Spark Dataframe列,另一列的最后一个字符

java - 使用java在spark中的函数之间

apache-spark - Spark 独立集群 :Configuring Distributed File System

json - Spark Streaming 中的 UnknownHostException 错误

apache-spark - Spark Streaming 作业运行速度非常慢

python - python : bugs in countByValue and countByValueAndWindow? 中的 Spark 流式传输

java - Spark - 将 scala 转换为 java

google-analytics - Hive 表中 Google Analytics Report 和 BigQuery Data 的统计数据差异

json - 在 Bigquery json_extract() 函数中转义字符

google-bigquery - 从同一个表中取消嵌套多个字段 - BigQuery