python - 为什么我的简单Spark应用程序运行这么慢?

标签 python apache-spark hadoop pyspark

我正在尝试使用Spark API对由mllib的FP增长产生的频繁项目集进行count编码。我的Spark是版本1.5.1。以下是我的代码:

#!/usr/bin/python 
from pyspark.mllib.fpm import FPGrowth
from pyspark import SparkContext,SparkConf
from pyspark import HiveContext
import os
os.environ['PYSPARK_PYTHON']='/usr/bin/python'
appName = "FP_growth"
sc = SparkContext()
sql_context = HiveContext(sc)

def read_spu(prod):#prod_code):
    sql = """
        select 
        t.orderno_nosplit, 
        t.prod_code, 
        t.item_code, 
        sum(t.item_qty) as item_qty
        from ioc_fdm.fdm_dwr_ioc_fcs_pk_spu_item_f_chain t
        where t.prod_code='%s'
        group by t.prod_code, t.orderno_nosplit, t.item_code  """%prod
    spu_result = sql_context.sql(sql)
    return spu_result.cache()

if __name__ == '__main__':
    spu=read_spu('6727780')  
    conf=0.7             
    trans=spu.rdd.repartition(100).map(lambda x: (x[0],x[2])).groupByKey().mapValues(list).values().cache()
    model = FPGrowth.train(trans, 0.01, 100) 
    freq_count = model.freqItemsets().count()
    print 'freq_count:',freq_count  
    sc.stop()

输入数据是从Hadoop读取的,数据不是很大,只有大约20000行。但是,该脚本在.count阶段工作非常缓慢。我不知道为什么从性能上看,似乎是由于数据偏斜。但是输出数据并不大(每个任务仅约100KB)。

该集群有8个节点,共320个内核,总内存为1.56 T(不仅一个用户)。我的提交 Spark 的脚本是spark-submit --master yarn-cluster --executor-memory 30g --num-executors 20 --executor-cores 5 FP_growth.py
附件是运行时性能的屏幕截图:

Resource used

Active Stages

Tasks

最佳答案

repartition(100)看起来不是一个好主意,您可以检查哪个阶段花费的时间最多。由于只有20000条记录。遣返应将它们分为每个分区200条记录。

如果数据量不大,则根本不需要遣返。或者尝试使用40-60个分区(2或3)*没有执行程序。

关于python - 为什么我的简单Spark应用程序运行这么慢?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50523055/

相关文章:

python - 合并数组和绘图

Python循环按天计算按月分组的总数

JavaSparkStreaming | JavaSparkStreaming在awaitResult中抛出异常

java - 将之前写入 HDFS 的 lucene 索引加载到 RamDirectory

python - 将 Python Pandas 数据框行与加法结合起来

python - Python使用装饰器和继承注册类方法

apache-spark - 了解各个阶段的 Spark 终端输出

scala - Spark : Parse a Date/Timestamps with different Formats (MM-dd-yyyy HH:mm, MM/dd/yy H:mm ) 在 Dataframe 的同一列

hadoop - 如何删除hive hadoop数据库中的行

hadoop - 统计hdfs中压缩文件的个数