postgresql - Spark 从 Postgres JDBC 表读取速度慢

标签 postgresql apache-spark jdbc pyspark apache-spark-sql

我正在尝试将大约 100 万行从 PostgreSQL 数据库加载到 Spark 中。使用 Spark 时大约需要 10 秒。但是,使用 psycopg2 驱动程序加载相同的查询需要 2 秒。我正在使用 postgresql jdbc 驱动程序版本 42.0.0

def _loadFromPostGres(name):
    url_connect = "jdbc:postgresql:"+dbname
    properties = {"user": "postgres", "password": "postgres"}
    df = SparkSession.builder.getOrCreate().read.jdbc(url=url_connect, table=name, properties=properties)
    return df

df = _loadFromPostGres("""
    (SELECT "seriesId", "companyId", "userId", "score" 
    FROM user_series_game 
    WHERE "companyId"=655124304077004298) as
user_series_game""")

print measure(lambda : len(df.collect()))

输出是-

--- 10.7214591503 seconds ---
1076131

使用 psycopg2 -

import psycopg2
conn = psycopg2.connect(conn_string)
cur = conn.cursor()

def _exec():
    cur.execute("""(SELECT "seriesId", "companyId", "userId", "score" 
        FROM user_series_game 
        WHERE "companyId"=655124304077004298)""")
    return cur.fetchall()
print measure(lambda : len(_exec()))
cur.close()
conn.close()

输出是-

--- 2.27961301804 seconds ---
1076131

测量函数-

def measure(func) :
    start_time = time.time()
    x = func()
    print("--- %s seconds ---" % (time.time() - start_time))
    return x

请帮我找出这个问题的原因。


编辑 1

我做了更多的基准测试。使用 Scala 和 JDBC -

import java.sql._;
import scala.collection.mutable.ArrayBuffer;

def exec() {

val url = ("jdbc:postgresql://prod.caumccqvmegm.ap-southeast-1.rds.amazonaws.com/prod"+ 
    "?tcpKeepAlive=true&prepareThreshold=-1&binaryTransfer=true&defaultRowFetchSize=10000")

val conn = DriverManager.getConnection(url,"postgres","postgres");

val sqlText = """SELECT "seriesId", "companyId", "userId", "score" 
        FROM user_series_game 
        WHERE "companyId"=655124304077004298"""

val t0 = System.nanoTime()

val stmt = conn.prepareStatement(sqlText, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)

val rs = stmt.executeQuery()

val list = new ArrayBuffer[(Long, Long, Long, Double)]()

while (rs.next()) {
    val seriesId = rs.getLong("seriesId")
    val companyId = rs.getLong("companyId")
    val userId = rs.getLong("userId")
    val score = rs.getDouble("score")
    list.append((seriesId, companyId, userId, score))
}

val t1 = System.nanoTime()

println("Elapsed time: " + (t1 - t0) * 1e-9 + "s")

println(list.size)

rs.close()
stmt.close()
conn.close()
}

exec()

输出是-

Elapsed time: 1.922102285s
1143402

当我在 Spark + Scala 中执行 collect() 时 -

import org.apache.spark.sql.SparkSession

def exec2() {

    val spark = SparkSession.builder().getOrCreate()

    val url = ("jdbc:postgresql://prod.caumccqvmegm.ap-southeast-1.rds.amazonaws.com/prod"+ 
    "?tcpKeepAlive=true&prepareThreshold=-1&binaryTransfer=true&defaultRowFetchSize=10000")

    val sqlText = """(SELECT "seriesId", "companyId", "userId", "score" 
        FROM user_series_game 
        WHERE "companyId"=655124304077004298) as user_series_game"""

    val t0 = System.nanoTime()

    val df = spark.read
          .format("jdbc")
          .option("url", url)
          .option("dbtable", sqlText)
          .option("user", "postgres")
          .option("password", "postgres")
          .load()

    val list = df.collect()

    val t1 = System.nanoTime()

    println("Elapsed time: " + (t1 - t0) * 1e-9 + "s")

    print (list.size)
}

exec2()

输出是

Elapsed time: 1.486141076s
1143445

因此在 Python 序列化中花费了 4 倍的额外时间。我知道会有一些惩罚,但这似乎太多了。

最佳答案

原因很简单,同时有两个原因。

首先,我将向您介绍 psycopg2 的工作原理。

这个库 psycopg2 像任何其他库一样连接到 RDMS。该库会将查询发送到您的 postgres 引擎,并将数据返回给您。像这样直接前进。

Conn -> Query -> ReturnData -> FetchData

当您使用 spark 时,有两种方式略有不同。 Spark 不像一种在单个线程中运行的编程语言。它有一个分布式系统来工作。即使你在本地机器上运行。请参阅 Spark 具有 Driver(Master) 和 Workers 的基本概念。

Driver 接收请求执行对 Postgres 的查询,Driver 不会请求每个 worker 从你的 Postgres 请求信息的数据。

如果您看到文档 here你会看到这样的注释:

Don’t create too many partitions in parallel on a large cluster; otherwise Spark might crash your external database systems.

这个注释意味着每个工作人员将负责为您的 postgres 请求数据。这是开始这个​​过程的一个小开销,但没什么大不了的。但是这里有一个开销,将数据发送给每个工作人员。

第二点,你在这部分代码中的收集:

print measure(lambda : len(df.collect()))

collect 函数将发送一条命令,让您的所有工作人员将数据发送到您的驱动程序。要存储在驱动程序的内存中,它就像一个 Reduce,它会在进程中间创建 Shuffle。洗牌是将数据发送给其他工作人员的过程的步骤。在收集的情况下,每个 worker 都会将其发送给您的司机。

那么你代码的JDBC中Spark的步骤是:

(Workers)Conn -> (Workers)Query -> (Workers)FetchData -> (Driver) Request the Data -> (Workers) Shuffle -> (Driver) Collect

好吧,Spark 还发生了很多其他事情,比如 QueryPlan、构建 DataFrame 和其他事情。

这就是您在简单的 Python 代码中比 Spark 响应更快的原因。

关于postgresql - Spark 从 Postgres JDBC 表读取速度慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43533751/

相关文章:

spring - 在 Spring Boot 中使用 HQL 查询

scala - 如何在Spark中的执行器之间同步功能以避免在写入Elastic时并发

python - 使用工作流在 github 中对 databricks python 代码进行 flake8 linting

java - 从 JDBC 请求捕获值并用于下一个查询

c# - 如何修复 Mono 3.0.3、Entity Framework 6 Beta2 和 Npgsql 上 MVC3 项目的 DbProviderServices 继承错误?

sql - 如何在 PostgreSQL 中以日期格式本身添加 AM 或 PM?

SQL查找树中的所有直接后代

python - 使用Python从HDFS目录中读取文件并在Spark中创建RDD

java - 使用 jdbc 时的日志记录问题

android - 使用Android连接mysql数据库