所以我正在做一些应该很简单的事情,但显然它不在 Spark SQL 中。
如果我在 MySQL 中运行以下查询,查询会在几分之一秒内完成:
SELECT ua.address_id
FROM user u
inner join user_address ua on ua.address_id = u.user_address_id
WHERE u.user_id = 123;
但是,在 Spark (1.5.1) 下的 HiveContext 中运行相同的查询需要超过 13 秒。添加更多连接会使查询运行很长时间(超过 10 分钟)。我不确定我在这里做错了什么以及如何加快速度。
这些表是作为临时表加载到 Hive 上下文中的 MySQL 表。它在单个实例中运行,数据库在远程机器上。
- user 表有大约 480 万行。
- user_address 表有 350,000 行。
表有外键字段,但数据库中没有定义明确的外键关系。我正在使用 InnoDB。
Spark 中的执行计划:
计划:
Scan JDBCRelation(jdbc:mysql://.user,[Lorg.apache.spark.Partition;@596f5dfc, {user=, password=, url=jdbc:mysql://, dbtable=user}) [address_id#0L,user_address_id#27L]
Filter (user_id#0L = 123) Scan JDBCRelation(jdbc:mysql://.user_address, [Lorg.apache.spark.Partition;@2ce558f3,{user=, password=, url=jdbc:mysql://, dbtable=user_address})[address_id#52L]
ConvertToUnsafe ConvertToUnsafe
TungstenExchange hashpartitioning(address_id#52L) TungstenExchange hashpartitioning(user_address_id#27L) TungstenSort [address_id#52L ASC], false, 0 TungstenSort [user_address_id#27L ASC], false, 0
SortMergeJoin [user_address_id#27L], [address_id#52L]
== Physical Plan == TungstenProject [address_id#0L]
最佳答案
首先,您执行的查询类型非常低效。至于现在(Spark 1.5.0*)要像这样执行连接,每次执行查询时都必须对两个表进行混洗/散列分区。在 users
表的情况下,这应该不是问题,其中 user_id = 123
谓词很可能被下推,但仍需要对 user_address
进行完全洗牌>。
此外,如果表只被注册而不被缓存,那么每次执行这个查询都会从 MySQL 中获取整个 user_address
表到 Spark。
I'm not sure what I'm doing wrong here and how I can speed things up.
目前还不清楚为什么要将 Spark 用于应用程序,但单机设置、小数据和查询类型表明 Spark 不适合这里。
一般来说,如果应用程序逻辑需要单个记录访问,那么 Spark SQL 将无法正常运行。它是为分析查询而设计的,而不是作为 OLTP 数据库的替代品。
如果单个表/数据框小得多,您可以尝试广播。
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.broadcast
val user: DataFrame = ???
val user_address: DataFrame = ???
val userFiltered = user.where(???)
user_addresses.join(
broadcast(userFiltered), $"address_id" === $"user_address_id")
* 这应该在 Spark 1.6.0 中更改为 SPARK-11410这应该启用持久表分区。
关于mysql - Spark SQL/Hive 查询永远伴随着 Join,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34036286/