我有一个非常巨大的集群,包含 20 个 m4.Xlarge 实例。 我的文件大小为 20GB,文件中的记录数为 193944092。
从这个文件中我需要三个信息。
1. 记录总数
2. 不同记录总数
3. 基于一列的不同记录总数 (FundamentalSeriesId
)。
当我运行下面的代码时,需要很长时间。计算记录总数需要 7 分钟。
但是对于 FundamentalSeriesId 列的不同记录总数和不同记录总数,它花费了很长时间,我的意思是我已经取消了查询,因为它花费了很长时间。
如果有人可以改进我的代码,那就太好了。我可以使用缓存或其他东西来更快地获取信息吗?
这就是我正在做的
val rdd = sc.textFile("s3://kishore-my-bucket-trf/Fundamental.FundamentalAnalytic.FundamentalAnalytic.SelfSourcedPublic.2011.1.2018-02-18-1340.Full.txt.gz")
println("Total count="+rdd.count())
val header = rdd.filter(_.contains("FundamentalSeriesId")).map(line => line.split("\\|\\^\\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("FundamentalSeriesId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)
println("distinct count="+data.distinct.count())
val data1=data.select($"FundamentalSeriesId")
println("count of distinct FundamentalSeriesId column="+data1.distinct.count())
我的样本记录是这样的..
FundamentalSeriesId|^|FundamentalSeriesId.objectTypeId|^|FundamentalSeriesId.objectType_1|^|financialPeriodEndDate|^|financialPeriodType|^|lineItemId|^|analyticItemInstanceKey_1|^|AnalyticValue_1|^|AnalyticConceptCode_1|^|AnalyticValue.currencyId_1|^|AnalyticIsEstimated_1|^|AnalyticAuditabilityEquation_1|^|FinancialPeriodTypeId_1|^|AnalyticConceptId_1|^|sYearToDate|^|IsAnnual_1|^|TaxonomyId_1|^|InstrumentId_1|^|AuditID_1|^|PhysicalMeasureId_1|^|FFAction_1
最佳答案
Distinct 是 Spark 中的常见问题,如果可以的话,请使用 countApproxDistinct 代替。
关于apache-spark - 为什么我的count,Distinct和Distinct count在spark的巨大集群中非常慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48931645/