mysql - Spark SQL/Hive 查询永远伴随着 Join

标签 mysql apache-spark apache-spark-sql

所以我正在做一些应该很简单的事情,但显然它不在 Spark SQL 中。

如果我在 MySQL 中运行以下查询,查询会在几分之一秒内完成:

SELECT ua.address_id
FROM user u
inner join user_address ua on ua.address_id = u.user_address_id
WHERE u.user_id = 123;

但是,在 Spark (1.5.1) 下的 HiveContext 中运行相同的查询需要超过 13 秒。添加更多连接会使查询运行很长时间(超过 10 分钟)。我不确定我在这里做错了什么以及如何加快速度。

这些表是作为临时表加载到 Hive 上下文中的 MySQL 表。它在单个实例中运行,数据库在远程机器上。

  • user 表有大约 480 万行。
  • user_address 表有 350,000 行。

表有外键字段,但数据库中没有定义明确的外键关系。我正在使用 InnoDB。

Spark 中的执行计划:

计划:

Scan JDBCRelation(jdbc:mysql://.user,[Lorg.apache.spark.Partition;@596f5dfc, {user=, password=, url=jdbc:mysql://, dbtable=user}) [address_id#0L,user_address_id#27L]

Filter (user_id#0L = 123) Scan JDBCRelation(jdbc:mysql://.user_address, [Lorg.apache.spark.Partition;@2ce558f3,{user=, password=, url=jdbc:mysql://, dbtable=user_address})[address_id#52L]

ConvertToUnsafe ConvertToUnsafe

TungstenExchange hashpartitioning(address_id#52L) TungstenExchange hashpartitioning(user_address_id#27L) TungstenSort [address_id#52L ASC], false, 0 TungstenSort [user_address_id#27L ASC], false, 0

SortMergeJoin [user_address_id#27L], [address_id#52L]

== Physical Plan == TungstenProject [address_id#0L]

最佳答案

首先,您执行的查询类型非常低效。至于现在(Spark 1.5.0*)要像这样执行连接,每次执行查询时都必须对两个表进行混洗/散列分区。在 users 表的情况下,这应该不是问题,其中 user_id = 123 谓词很可能被下推,但仍需要对 user_address 进行完全洗牌>。

此外,如果表只被注册而不被缓存,那么每次执行这个查询都会从 MySQL 中获取整个 user_address 表到 Spark。

I'm not sure what I'm doing wrong here and how I can speed things up.

目前还不清楚为什么要将 Spark 用于应用程序,但单机设置、小数据和查询类型表明 Spark 不适合这里。

一般来说,如果应用程序逻辑需要单个记录访问,那么 Spark SQL 将无法正常运行。它是为分析查询而设计的,而不是作为 OLTP 数据库的替代品。

如果单个表/数据框小得多,您可以尝试广播。

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.broadcast

val user: DataFrame = ???
val user_address: DataFrame = ???

val userFiltered = user.where(???)

user_addresses.join(
  broadcast(userFiltered), $"address_id" === $"user_address_id")

* 这应该在 Spark 1.6.0 中更改为 SPARK-11410这应该启用持久表分区。

关于mysql - Spark SQL/Hive 查询永远伴随着 Join,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34036286/

相关文章:

apache-spark - 什么时候在 spark 中执行 REFRESH TABLE my_table?

java - Spark Java API,数据集操作?

apache-spark - PySpark-列中的to_date格式

mysql - 使用 SparkSQL 删除 MySQL 表

mysql - 如何添加引用 "limited length"的 2 列主键的外键

mysql - 更新mysql中的日期列

java - 分别在SPARK中处理多个文件

scala - 如何将每种类型的列分成两组?

wordpress - 具有多个条件的 MYSQL 语句 INNER JOIN

mysql - MySQL 中的 SELECT DISTINCT 语句需要 10 分钟