我已经编写了一个pyspark函数,但是当我多次运行时,每次输出不同时都会给我
在同一组输入数据上。
-pyspark函数
def give_percentile(plat,metrics,perc):
df_perc = df_final.filter(df_final.platform.like('%' + plat + '%'))
df_perc = df_perc.filter(df_final[metrics]!=0)
percentile_val = df_perc.approxQuantile(metrics, [perc], 0.05)
if len(percentile_val)>0:
percentile_val = float(percentile_val[0])
else:
percentile_val = float(0)
return percentile_val
调用函数
df_agg = sqlContext.createDataFrame([Row(platform='iOS',
percentile_page_load_50=give_percentile("iOS","page_load",0.5),
percentile_time_diff_50=give_percentile("iOS","session_duration",0.5)),
Row(platform='Android',
percentile_page_load_50=give_percentile("Android","page_load",0.5),
percentile_time_diff_50=give_percentile("Android","session_duration",0.5)),
Row(platform='Web',
percentile_page_load_50=give_percentile("Web","page_load",0.5),
percentile_time_diff_50=give_percentile("Web","session_duration",0.5)))
Spark Job提交:-
spark-submit --deploy-mode cluster --executor-cores 4 --executor-memory 12G --driver-cores 4 --driver-memory 12G --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC "path"
我们以Parquet文件格式存储pyspark代码的输出,并在此之上创建impala表,如下所示:
1.从表名称1 a中选择a.percentile_page_load_50,a.percentile_time_diff_50,其中a.platform ='Colvalue'并
a.dt ='20190501'限制5;
表记录数= 22093826
输出= 0.62400001287460327
0.35100001096725464
2.从表名称2a中选择a.percentile_page_load_50,a.percentile_time_diff_50,其中a.platform ='Colvalue',
a.dt ='20190501'限制5;
表记录数= 22093826
输出= 0.61500000953674316
0.28499999642372131
3.从表名3a中选择a.percentile_page_load_50,a.percentile_time_diff_50,其中a.platform ='Colvalue',
a.dt ='20190501'限制5;
表记录数= 22093826
输出= 0.61799997091293335
0.27799999713897705
现在在这里,Tablename1,Tablename2和Tablename3是在同一组输入数据上多次运行pyspark代码的输出。
但由于我们的pyspark代码在cluser模式/分布式模式下运行,因此值仍然不同。当我们检查样本数据时
独立模式下,其值未更改。
因此,您能否在这里帮助我,并告诉我上述功能代码或任何其他群集问题有什么问题?
最佳答案
根据给定的relativeError函数,roximateile函数可为您提供近似的解决方案。您将roximateQuantile函数的允许relativeError设置为0.05,这意味着它仅在以下范围内具有确定性:
“如果DataFrame具有N个元素,并且如果我们以概率p要求分位数直到错误err,则该算法将从DataFrame返回样本x ,以使x的精确等级接近(p * N)。 ” (我强调了为什么您得到不同结果的部分)。
如果需要精确的分位数,则必须将relativeError设置为0.0,但这也会增加运行时间。
可以在documentation中找到更多信息。
关于apache-spark - 多次运行时在同一组数据上不同的Pyspark代码输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56215992/