python - 为什么即使使用 limit 命令访问结果,SPARK\PYSPARK 也会计算所有内容?

标签 python apache-spark pyspark

我将 mysql 表存储在 dataframe_mysql 中

dataframe_mysql = sqlContext.read.format("jdbc").options(...
dataframe_mysql.registerTempTable('dataf')
groupedtbl=sqlContext.sql("""SELECT job_seq_id,first(job_dcr_content) as firststage,last(job_dcr_content) as laststage,
                          first(from_stage) as source, last(from_stage) as target , count(jid) as noofstages from dataf group by job_seq_id having count(jid)>1""" )



from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType
func1 = udf(fu1, StringType())
func2= udf(fu2, StringType())
res1=groupedtbl.withColumn('dcol',func1(groupedtbl.firststage,groupedtbl.lastage,groupedtbl.job_seq_id))
res2=res1.withColumn('lcol',func2(res1.dcol,res1.job_seq_id))

对于上面的代码,我看到即使我发出限制命令:

lb=res2.limit(2).collect()

或以下命令只获取一条记录的结果:

labels.filter(res2.job_seq_id==5843064)

它不是仅仅在第一个查询中获得两个结果或在第二个查询中获得一个结果,而是对其他行进行了大量不必要的计算,即使只需要两行也会浪费时间。我可以从内部日志中看到这一点,即使只是获取两行计算 100 行,然后从中检索两个结果行。我虽然 DAG 机制应该处理这个问题,但它似乎没有,我在这个观察中是不是错了?

最佳答案

这里有很多不同的问题。有些与您使用的数据源有关,有些与查询有关,最后还有一些是通过将 Python UDF 与 Spark < 2.0.0 一起使用引入的。

一步一步:

  • 使用 JDBC 数据源时,只有简单的谓词被下推到数据源。它仅表示主要 WHERE 子句内的条件(不在 HAVING 或其他计算字段内)。包括聚合和限制在内的所有其他内容都发生在 Spark 内部(请参阅 Does spark predicate pushdown work with JDBC?More than one hour to execute pyspark.sql.DataFrame.take(4))。

  • 如果没有显式分区,Spark 没有关于数据分布的先验知识。这意味着任何需要数据聚合的代码都必须访问所有记录。因此,具有聚合的 limit 子句只能在执行计划中聚合之后存在。这意味着:

    res2.limit(2)
    

    无法优化。

  • PySpark UDF in Spark < 2.0.0 在执行计划中引入隐式断点:

    from pyspark.sql.functions import col, udf
    
    options = ...
    df = sqlContext.read.format("jdbc").options(**options).load()
    
    df.printSchema()
    ## root
    ##  |-- x: integer (nullable = true)
    ##  |-- y: integer (nullable = true)
    

    如果没有 BatchPythonEvaluation,可以看到 predicate 被下推了

    df.groupBy("x").sum("y").where(col("x") > 2).explain()
    ## == Physical Plan ==
    ## TungstenAggregate(key=[x#182], functions=[(sum(cast(y#183 as bigint)),mode=Final,isDistinct=false)], output=[x#182,sum(y)#192L])
    ## +- TungstenExchange hashpartitioning(x#182,200), None
    ##    +- TungstenAggregate(key=[x#182], functions=[(sum(cast(y#183 as bigint)),mode=Partial,isDistinct=false)], output=[x#182,sum#197L])
    ##       +- Filter (x#182 > 2)
    ##          +- Scan JDBCRelation(...,point,[Lorg.apache.spark.Partition;@61ee5d1a,{...})[x#182,y#183] PushedFilters: [GreaterThan(x,2)]
    

    但在添加 UDF 调用时不是,即使未使用输出也是如此

    identity = udf(lambda x: x)
    
    df.groupBy("x").sum("y").withColumn("foo", identity(col("x"))).where(col("x") > 2).explain()
    == Physical Plan ==
    ## Project [x#182,sum(y)#214L,pythonUDF#216 AS foo#215]
    ## +- Filter (x#182 > 2)
    ##    +- !BatchPythonEvaluation PythonUDF#<lambda>(x#182), [x#182,sum(y)#214L,pythonUDF#216]
    ##       +- TungstenAggregate(key=[x#182], functions=[(sum(cast(y#183 as bigint)),mode=Final,isDistinct=false)], output=[x#182,sum(y)#214L])
    ##          +- TungstenExchange hashpartitioning(x#182,200), None
    ##             +- TungstenAggregate(key=[x#182], functions=[(sum(cast(y#183 as bigint)),mode=Partial,isDistinct=false)], output=[x#182,sum#219L])
    ##                +- Scan JDBCRelation(...,point,[Lorg.apache.spark.Partition;@61ee5d1a,{...})[x#182,y#183]
    

    此行为已在 Spark 2.0.0 中进行了优化。

关于python - 为什么即使使用 limit 命令访问结果,SPARK\PYSPARK 也会计算所有内容?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37564073/

相关文章:

python - matplotlib 3d 线框图

python - While 循环内的输入函数

apache-spark - 在 Spark Streaming 中,如何检测空批处理?

apache-spark - 调用 LogisticRegressionModelWithLBFGS.train 时出现 Py4JavaError

python - 命名元组实例的酸洗正常成功,但在模块被 Cythonized 时失败

python - Django将两个模型实例合并为一个实例以供查看

scala - 为什么 spark blas 在 1 级例程中使用 f2jBLAS 而不是原生 BLAS?

python - 何时在 PySpark 中使用 UDF 与函数?

apache-spark - 将结构数组扩展为PySpark中的列

python - 在pyspark中将数据帧转换为字符串