apache-spark - 多次运行时在同一组数据上不同的Pyspark代码输出

标签 apache-spark hadoop pyspark

我已经编写了一个pyspark函数,但是当我多次运行时,每次输出不同时都会给我
在同一组输入数据上。

-pyspark函数

def give_percentile(plat,metrics,perc):
df_perc = df_final.filter(df_final.platform.like('%' + plat + '%'))
df_perc = df_perc.filter(df_final[metrics]!=0)
percentile_val = df_perc.approxQuantile(metrics, [perc], 0.05)
if len(percentile_val)>0:
    percentile_val = float(percentile_val[0])
else:
    percentile_val = float(0)
return percentile_val

调用函数
   df_agg = sqlContext.createDataFrame([Row(platform='iOS',
                                        percentile_page_load_50=give_percentile("iOS","page_load",0.5),
                                        percentile_time_diff_50=give_percentile("iOS","session_duration",0.5)),
                                        Row(platform='Android',
                                        percentile_page_load_50=give_percentile("Android","page_load",0.5),
                                        percentile_time_diff_50=give_percentile("Android","session_duration",0.5)),
                                        Row(platform='Web',
                                        percentile_page_load_50=give_percentile("Web","page_load",0.5),
                                        percentile_time_diff_50=give_percentile("Web","session_duration",0.5)))

Spark Job提交:-
    spark-submit --deploy-mode cluster  --executor-cores 4 --executor-memory 12G --driver-cores 4 --driver-memory 12G --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC "path"

我们以Parquet文件格式存储pyspark代码的输出,并在此之上创建impala表,如下所示:

1.从表名称1 a中选择a.percentile_page_load_50,a.percentile_time_diff_50,其中a.platform ='Colvalue'并
a.dt ='20190501'限制5;
表记录数= 22093826

输出= 0.62400001287460327
0.35100001096725464

2.从表名称2a中选择a.percentile_page_load_50,a.percentile_time_diff_50,其中a.platform ='Colvalue',
a.dt ='20190501'限制5;
表记录数= 22093826

输出= 0.61500000953674316
0.28499999642372131
3.从表名3a中选择a.percentile_page_load_50,a.percentile_time_diff_50,其中a.platform ='Colvalue',
a.dt ='20190501'限制5;
表记录数= 22093826

输出= 0.61799997091293335
0.27799999713897705

现在在这里,Tablename1,Tablename2和Tablename3是在同一组输入数据上多次运行pyspark代码的输出。
但由于我们的pyspark代码在cluser模式/分布式模式下运行,因此值仍然不同。当我们检查样本数据时
独立模式下,其值未更改。
因此,您能否在这里帮助我,并告诉我上述功能代码或任何其他群集问题有什么问题?

最佳答案

根据给定的relativeError函数,roximateile函数可为您提供近似的解决方案。您将roximateQuantile函数的允许relativeError设置为0.05,这意味着它仅在以下范围内具有确定性:

“如果DataFrame具有N个元素,并且如果我们以概率p要求分位数直到错误err,则该算法将从DataFrame返回样本x ,以使x的精确等级接近(p * N)。 ” (我强调了为什么您得到不同结果的部分)。

如果需要精确的分位数,则必须将relativeError设置为0.0,但这也会增加运行时间。
可以在documentation中找到更多信息。

关于apache-spark - 多次运行时在同一组数据上不同的Pyspark代码输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56215992/

相关文章:

apache-spark - BlockManager 进程的 Spark Connection 被拒绝

hadoop - 插入 Hive 表时如何从 HDFS 中选择动态文件名

hadoop - 在 Spark 中将简单的 RDD 写入 DynamoDB

scala - 如何设计 Spark 应用,让 Shuffle 数据在一些迭代后自动清理

hadoop - 是否可以创建具有文本输出格式的配置单元表?

hadoop - 选择性足以仅考虑HIVE文件中很少的值

python - 如何格式化pyspark中的数字列?

python - 使用第一个非缺失值向前填充行

python - 如何从多个列表创建 pyspark 数据框

apache-spark - Spark 任务不可序列化 Hadoop-MongoDB-Connector 安然