apache-spark - 为什么我的count,Distinct和Distinct count在spark的巨大集群中非常慢

标签 apache-spark apache-spark-sql

我有一个非常巨大的集群,包含 20 个 m4.Xlarge 实例。 我的文件大小为 20GB,文件中的记录数为 193944092。

从这个文件中我需要三个信息。 1. 记录总数 2. 不同记录总数 3. 基于一列的不同记录总数 (FundamentalSeriesId)。

当我运行下面的代码时,需要很长时间。计算记录总数需要 7 分钟。

但是对于 FundamentalSeriesId 列的不同记录总数和不同记录总数,它花费了很长时间,我的意思是我已经取消了查询,因为它花费了很长时间。

如果有人可以改进我的代码,那就太好了。我可以使用缓存或其他东西来更快地获取信息吗?

这就是我正在做的

    val rdd = sc.textFile("s3://kishore-my-bucket-trf/Fundamental.FundamentalAnalytic.FundamentalAnalytic.SelfSourcedPublic.2011.1.2018-02-18-1340.Full.txt.gz")
println("Total count="+rdd.count())

val header = rdd.filter(_.contains("FundamentalSeriesId")).map(line => line.split("\\|\\^\\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("FundamentalSeriesId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)

println("distinct count="+data.distinct.count())

val data1=data.select($"FundamentalSeriesId")
println("count of distinct FundamentalSeriesId column="+data1.distinct.count())

我的样本记录是这样的..

FundamentalSeriesId|^|FundamentalSeriesId.objectTypeId|^|FundamentalSeriesId.objectType_1|^|financialPeriodEndDate|^|financialPeriodType|^|lineItemId|^|analyticItemInstanceKey_1|^|AnalyticValue_1|^|AnalyticConceptCode_1|^|AnalyticValue.currencyId_1|^|AnalyticIsEstimated_1|^|AnalyticAuditabilityEquation_1|^|FinancialPeriodTypeId_1|^|AnalyticConceptId_1|^|sYearToDate|^|IsAnnual_1|^|TaxonomyId_1|^|InstrumentId_1|^|AuditID_1|^|PhysicalMeasureId_1|^|FFAction_1

最佳答案

Distinct 是 Spark 中的常见问题,如果可以的话,请使用 countApproxDistinct 代替。

关于apache-spark - 为什么我的count,Distinct和Distinct count在spark的巨大集群中非常慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48931645/

相关文章:

apache-spark - Apache Spark 按 DF 分组,将值收集到列表中,然后按列表分组

apache-spark - 为什么 SparkContext 会随机关闭,如何从 Zeppelin 重新启动?

apache-spark-sql - 在spark sql中选择除特定列之外的所有内容

apache-spark - pyspark 预期构建 ClassDict 的参数为零(对于 pyspark.mllib.linalg.DenseVector)

hadoop - 无法保留HIVE表

r - 在Hadoop服务器上分配R处理

java - 如何使用 Java 检查从 Spark 结构化流中的 Kafka 获取数据?

python - 将特定功能应用于 Spark 数据框中的结构化列的有效方法?

apache-spark - 排序后的数据框分区数?

pyspark - 如何从 Pyspark 中的另一列获取包含值列表的列