我正在尝试将大约 100 万行从 PostgreSQL 数据库加载到 Spark 中。使用 Spark 时大约需要 10 秒。但是,使用 psycopg2 驱动程序加载相同的查询需要 2 秒。我正在使用 postgresql jdbc 驱动程序版本 42.0.0
def _loadFromPostGres(name):
url_connect = "jdbc:postgresql:"+dbname
properties = {"user": "postgres", "password": "postgres"}
df = SparkSession.builder.getOrCreate().read.jdbc(url=url_connect, table=name, properties=properties)
return df
df = _loadFromPostGres("""
(SELECT "seriesId", "companyId", "userId", "score"
FROM user_series_game
WHERE "companyId"=655124304077004298) as
user_series_game""")
print measure(lambda : len(df.collect()))
输出是-
--- 10.7214591503 seconds ---
1076131
使用 psycopg2 -
import psycopg2
conn = psycopg2.connect(conn_string)
cur = conn.cursor()
def _exec():
cur.execute("""(SELECT "seriesId", "companyId", "userId", "score"
FROM user_series_game
WHERE "companyId"=655124304077004298)""")
return cur.fetchall()
print measure(lambda : len(_exec()))
cur.close()
conn.close()
输出是-
--- 2.27961301804 seconds ---
1076131
测量函数-
def measure(func) :
start_time = time.time()
x = func()
print("--- %s seconds ---" % (time.time() - start_time))
return x
请帮我找出这个问题的原因。
编辑 1
我做了更多的基准测试。使用 Scala 和 JDBC -
import java.sql._;
import scala.collection.mutable.ArrayBuffer;
def exec() {
val url = ("jdbc:postgresql://prod.caumccqvmegm.ap-southeast-1.rds.amazonaws.com/prod"+
"?tcpKeepAlive=true&prepareThreshold=-1&binaryTransfer=true&defaultRowFetchSize=10000")
val conn = DriverManager.getConnection(url,"postgres","postgres");
val sqlText = """SELECT "seriesId", "companyId", "userId", "score"
FROM user_series_game
WHERE "companyId"=655124304077004298"""
val t0 = System.nanoTime()
val stmt = conn.prepareStatement(sqlText, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
val rs = stmt.executeQuery()
val list = new ArrayBuffer[(Long, Long, Long, Double)]()
while (rs.next()) {
val seriesId = rs.getLong("seriesId")
val companyId = rs.getLong("companyId")
val userId = rs.getLong("userId")
val score = rs.getDouble("score")
list.append((seriesId, companyId, userId, score))
}
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) * 1e-9 + "s")
println(list.size)
rs.close()
stmt.close()
conn.close()
}
exec()
输出是-
Elapsed time: 1.922102285s
1143402
当我在 Spark + Scala 中执行 collect() 时 -
import org.apache.spark.sql.SparkSession
def exec2() {
val spark = SparkSession.builder().getOrCreate()
val url = ("jdbc:postgresql://prod.caumccqvmegm.ap-southeast-1.rds.amazonaws.com/prod"+
"?tcpKeepAlive=true&prepareThreshold=-1&binaryTransfer=true&defaultRowFetchSize=10000")
val sqlText = """(SELECT "seriesId", "companyId", "userId", "score"
FROM user_series_game
WHERE "companyId"=655124304077004298) as user_series_game"""
val t0 = System.nanoTime()
val df = spark.read
.format("jdbc")
.option("url", url)
.option("dbtable", sqlText)
.option("user", "postgres")
.option("password", "postgres")
.load()
val list = df.collect()
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) * 1e-9 + "s")
print (list.size)
}
exec2()
输出是
Elapsed time: 1.486141076s
1143445
因此在 Python 序列化中花费了 4 倍的额外时间。我知道会有一些惩罚,但这似乎太多了。
最佳答案
原因很简单,同时有两个原因。
首先,我将向您介绍 psycopg2
的工作原理。
这个库 psycopg2
像任何其他库一样连接到 RDMS。该库会将查询发送到您的 postgres 引擎,并将数据返回给您。像这样直接前进。
Conn -> Query -> ReturnData -> FetchData
当您使用 spark 时,有两种方式略有不同。 Spark 不像一种在单个线程中运行的编程语言。它有一个分布式系统来工作。即使你在本地机器上运行。请参阅 Spark 具有 Driver(Master) 和 Workers 的基本概念。
Driver 接收请求执行对 Postgres 的查询,Driver 不会请求每个 worker 从你的 Postgres 请求信息的数据。
如果您看到文档 here你会看到这样的注释:
Don’t create too many partitions in parallel on a large cluster; otherwise Spark might crash your external database systems.
这个注释意味着每个工作人员将负责为您的 postgres 请求数据。这是开始这个过程的一个小开销,但没什么大不了的。但是这里有一个开销,将数据发送给每个工作人员。
第二点,你在这部分代码中的收集:
print measure(lambda : len(df.collect()))
collect 函数将发送一条命令,让您的所有工作人员将数据发送到您的驱动程序。要存储在驱动程序的内存中,它就像一个 Reduce,它会在进程中间创建 Shuffle。洗牌是将数据发送给其他工作人员的过程的步骤。在收集的情况下,每个 worker 都会将其发送给您的司机。
那么你代码的JDBC中Spark的步骤是:
(Workers)Conn -> (Workers)Query -> (Workers)FetchData -> (Driver) Request the Data -> (Workers) Shuffle -> (Driver) Collect
好吧,Spark 还发生了很多其他事情,比如 QueryPlan、构建 DataFrame 和其他事情。
这就是您在简单的 Python 代码中比 Spark 响应更快的原因。
关于postgresql - Spark 从 Postgres JDBC 表读取速度慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43533751/