我每天运行此代码,并将输出保存在某个选定的位置:
def aggConnections()(input: DataFrame) = {
input
.groupBy (
$"spid" (0).as ("domain_userid"),
$"cxid" (0).as ("key_id") )
.agg (
min ($"Time").as ("first_seen"),
max ($"Time").as ("last_seen") )
.select ($"domain_userid", $"key_id",
lit ("cxense_id").as ("key_source"),
lit (1).as ("times_seen"),
$"first_seen", $"last_seen")
.filter($"domain_userid".isNotNull && $"key_id".isNotNull)
}
val requests = spark.read.parquet("day1").transform(aggConnections())
spid
是一个数组,如下所示:
spid:array
element:string
这就是为什么我必须像 $"spid"(0).as ("domain_userid") 那样访问它
遗憾的是,有时在运行 Spark 3.0.1 的 AWS EMR 上运行此作业时出现以下错误:
diagnostics: User class threw exception: org.apache.spark.sql.AnalysisException: cannot resolve '`spid`' given input columns: [AdUnitId, AudienceSegmentIds, BandWidth, BandwidthGroupId, BandwidthId, Browser, BrowserId, City, CityId, CmsMetadata, Country, CountryId, DeviceCategory, Domain, GfpContentId, IsCompanion, IsFilledRequest, IsInterstitial, IsVideoFallbackRequest, KeyPart, Metro, MetroId, MobileAppId, MobileCapability, MobileCarrier, MobileDevice, OS, OSId, OSVersion, PodPosition, PostalCode, PostalCodeId, PublisherProvidedID, RefererURL, Region, RegionId, RequestLanguage, RequestedAdUnitSizes, Time, TimeUsec2, UserId, VideoPosition, addefend, ads_enabled, app_name, app_version, bereidingstijd, brand, careerlevel, cat, category, categorycode, categoryname, channel_id, channel_title, cid, cmssource, companyname, compid, content_url, contentid, contentsubstype, crt_cpm, crt_size, cxid, cxsg, dc_yt, deviceid, dos, dossier, educationlevel, floor, fsr, gang, gdpr_applies, gdpr_consentstring, gelegenheid, hb_pb, hb_size, hour, ifa, industry, ingredient, iom, itemid, ix_apnx_om, ix_cdb_om, ix_imdi_cpm, jobtitle, k21, kage, kar, kauth, kembed, keuken, kgender, klg, ko, kpid, kvlg, kvz, kw, lat, long, mc_asset_type, mediation, model, moeilijkheid, origin, pag, pagetype, path, pay, pos, positie, production_id, productname, retailercode, retailername, rpfl_12108, screen, sector, shopid, show_id, show_title, soort, src, stad, starttype, station, subforum, tag, theme, top, video_duration, video_label, yt_vrallowed, ytdevice];; 'Aggregate ['spid[0], cxid#136[0]], ['spid[0] AS domain_userid#276, cxid#136[0] AS key_id#277, min(Time#0) AS first_seen#417, max(Time#0) AS last_seen#419]
我认为问题是因为 spid
列不存在,但是当我检查架构时它存在于数据中。因此,我决定在Databricks上进行更准确的测试。奇怪的是,如果我在运行相同 Spark 版本 3.0.1 的 Databricks 上运行此代码,则每天的数据一切正常,包括在 EMR 上失败的数据。我真的无法解释发生了什么。
最佳答案
我发现我的每日分区也保存在每小时分区中。这些每小时分区的架构可能略有不同,因为有些小时错过了我的 spid
列。因此,在某些日子里,我的生产环境出现故障,并出现上面报告的错误。
该错误很难发现,因为我的测试实例将 option("mergeSchema", "true")
作为默认 Spark 选项,因此它在合并架构时没有遇到错误。
关于apache-spark - 无法解析给定输入列 : Spark-SQL 的 'columnname',我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65505941/