scala - 使用谓词下推连接两个数据集

标签 scala apache-spark hbase apache-spark-sql phoenix

我有一个从 RDD 创建的数据集,并尝试将它与另一个从我的 Phoenix 表创建的数据集连接起来:

val dfToJoin = sparkSession.createDataset(rddToJoin)
val tableDf = sparkSession
  .read
  .option("table", "table")
  .option("zkURL", "localhost")
  .format("org.apache.phoenix.spark")
  .load()
val joinedDf = dfToJoin.join(tableDf, "columnToJoinOn")

当我执行它时,似乎整个数据库表都被加载来进行连接。

有没有办法进行这样的连接,以便在数据库上而不是在 spark 上完成过滤?

另外:dfToJoin 比表小,我不知道这是否重要。

编辑:基本上我想将我的 Phoenix 表与通过 spark 创建的数据集连接起来,而不是将整个表提取到执行程序中。

Edit2:这是物理计划:
*Project [FEATURE#21, SEQUENCE_IDENTIFIER#22, TAX_NUMBER#23, 
         WINDOW_NUMBER#24, uniqueIdentifier#5, readLength#6]
 +- *SortMergeJoin [FEATURE#21], [feature#4], Inner
     :- *Sort [FEATURE#21 ASC NULLS FIRST], false, 0
     :  +- Exchange hashpartitioning(FEATURE#21, 200)
     :     +- *Filter isnotnull(FEATURE#21)
     :        +- *Scan PhoenixRelation(FEATURES,localhost,false) 

    [FEATURE#21,SEQUENCE_IDENTIFIER#22,TAX_NUMBER#23,WINDOW_NUMBER#24] 
    PushedFilters: [IsNotNull(FEATURE)], ReadSchema: 

    struct<FEATURE:int,SEQUENCE_IDENTIFIER:string,TAX_NUMBER:int,
    WINDOW_NUMBER:int>
   +- *Sort [feature#4 ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(feature#4, 200)
     +- *Filter isnotnull(feature#4)
        +- *SerializeFromObject [assertnotnull(input[0, utils.CaseClasses$QueryFeature, true], top level Product input object).feature AS feature#4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, utils.CaseClasses$QueryFeature, true], top level Product input object).uniqueIdentifier, true) AS uniqueIdentifier#5, assertnotnull(input[0, utils.CaseClasses$QueryFeature, true], top level Product input object).readLength AS readLength#6]
           +- Scan ExternalRDDScan[obj#3]

正如您所看到的,推送过滤器列表中不包含 equals-filter,因此很明显没有发生谓词下推。

最佳答案

Spark will fetch the Phoenix table records to appropriate executors(not the entire table to one executor)



由于没有直接filter在 Phoenix 表 df 上,我们只看到 *Filter isnotnull(FEATURE#21)在物理计划中。

正如您所提到的,当您对其应用过滤器时,Phoenix 表数据会减少。您将过滤器推送到 feature 上的 Phoenix table 通过查找列feature_ids在其他数据集中。
//This spread across workers  - fully distributed
val dfToJoin = sparkSession.createDataset(rddToJoin)

//This sits in driver - not distributed
val list_of_feature_ids = dfToJoin.dropDuplicates("feature")
  .select("feature")
  .map(r => r.getString(0))
  .collect
  .toList

//This spread across workers  - fully distributed
val tableDf = sparkSession
  .read
  .option("table", "table")
  .option("zkURL", "localhost")
  .format("org.apache.phoenix.spark")
  .load()
  .filter($"FEATURE".isin(list_of_feature_ids:_*)) //added filter

//This spread across workers  - fully distributed
val joinedDf = dfToJoin.join(tableDf, "columnToJoinOn")

joinedDf.explain()

关于scala - 使用谓词下推连接两个数据集,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46303489/

相关文章:

scala - 与抽象类相比,使用特征有什么优点?

java - Apache Spark : what's the designed behavior if master fails

apache-spark - 如何在 Spark 2.1 中启用 tungsten sort shuffle?

scala - 在 Spark 作业中写入 HBase : a conundrum with existential types

graph - 将多个顺序 HBase 查询的结果传递给 Mapreduce 作业

scala - 使用scala添加其他列的长度作为值的列

scala - 不带 UDF 的 Spark 数据集的加权平均值

scala - 如何创建所有 Row 值的逗号分隔字符串

scala - 为 Spark 作业的单元测试模拟 HTable 数据

hadoop - 如何在hbase中查找行键中的列数