apache-spark - Spark 写入 postgres 很慢

标签 apache-spark dataframe apache-spark-sql

我正在将数据帧中的数据(大约 83M 条记录)写入 postgresql 中,但速度有点慢。完成写入数据库需要 2.7 小时。

查看执行程序,只有一个执行程序在运行一项事件任务。有什么方法可以使用 Spark 中的所有执行程序将写入并行化到 db 中吗?

...
val prop = new Properties()
prop.setProperty("user", DB_USER)
prop.setProperty("password", DB_PASSWORD)
prop.setProperty("driver", "org.postgresql.Driver")



salesReportsDf.write
              .mode(SaveMode.Append)
              .jdbc(s"jdbc:postgresql://$DB_HOST:$DB_PORT/$DATABASE", REPORTS_TABLE, prop)

谢谢

最佳答案

所以我想出了问题。基本上,重新分区我的数据帧会将数据库写入吞吐量提高 100%

def srcTable(config: Config): Map[String, String] = {

  val SERVER             = config.getString("db_host")
  val PORT               = config.getInt("db_port")
  val DATABASE           = config.getString("database")
  val USER               = config.getString("db_user")
  val PASSWORD           = config.getString("db_password")
  val TABLE              = config.getString("table")
  val PARTITION_COL      = config.getString("partition_column")
  val LOWER_BOUND        = config.getString("lowerBound")
  val UPPER_BOUND        = config.getString("upperBound")
  val NUM_PARTITION      = config.getString("numPartitions")

  Map(
    "url"     -> s"jdbc:postgresql://$SERVER:$PORT/$DATABASE",
    "driver"  -> "org.postgresql.Driver",
    "dbtable" -> TABLE,
    "user"    -> USER,
    "password"-> PASSWORD,
    "partitionColumn" -> PARTITION_COL,
    "lowerBound" -> LOWER_BOUND,
    "upperBound" -> UPPER_BOUND,
    "numPartitions" -> NUM_PARTITION
  )

}

关于apache-spark - Spark 写入 postgres 很慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39396886/

相关文章:

scala - 与take(10)和limit(10).collect()的性能比较

scala - Spark java.lang.NoClassDefFoundError : org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2

apache-spark-sql - Spark SQL : How to consume json data from a REST service as DataFrame

hadoop - 'Provided' 是哪些特定的 Spark 库?

apache-spark - PySpark 如何按值排序(如果值相等则按键排序)?

python - 将新列迭代添加到具有唯一列名称的数据框

Python:使用 Pandas 从数据框中选择特定日期

java - 将 JavaRDD 字符串转换为 JavaRDD vector

java - Spark java.lang.StackOverflowError

python - 当有多个规范时,在 Pandas 中优化计算的最佳做法是什么?