apache-spark - 无法解析给定输入列 : Spark-SQL 的 'columnname'

标签 apache-spark

我每天运行此代码,并将输出保存在某个选定的位置:

  def aggConnections()(input: DataFrame) = {
    input
      .groupBy (
        $"spid" (0).as ("domain_userid"),
        $"cxid" (0).as ("key_id") )
      .agg (
        min ($"Time").as ("first_seen"),
        max ($"Time").as ("last_seen") )
      .select ($"domain_userid", $"key_id",
        lit ("cxense_id").as ("key_source"),
        lit (1).as ("times_seen"),
        $"first_seen", $"last_seen")
      .filter($"domain_userid".isNotNull && $"key_id".isNotNull)
  }

 val requests = spark.read.parquet("day1").transform(aggConnections())

spid 是一个数组,如下所示:

spid:array
     element:string

这就是为什么我必须像 $"spid"(0).as ("domain_userid") 那样访问它

遗憾的是,有时在运行 Spark 3.0.1 的 AWS EMR 上运行此作业时出现以下错误:

diagnostics: User class threw exception: org.apache.spark.sql.AnalysisException: cannot resolve '`spid`' given input columns: [AdUnitId, AudienceSegmentIds, BandWidth, BandwidthGroupId, BandwidthId, Browser, BrowserId, City, CityId, CmsMetadata, Country, CountryId, DeviceCategory, Domain, GfpContentId, IsCompanion, IsFilledRequest, IsInterstitial, IsVideoFallbackRequest, KeyPart, Metro, MetroId, MobileAppId, MobileCapability, MobileCarrier, MobileDevice, OS, OSId, OSVersion, PodPosition, PostalCode, PostalCodeId, PublisherProvidedID, RefererURL, Region, RegionId, RequestLanguage, RequestedAdUnitSizes, Time, TimeUsec2, UserId, VideoPosition, addefend, ads_enabled, app_name, app_version, bereidingstijd, brand, careerlevel, cat, category, categorycode, categoryname, channel_id, channel_title, cid, cmssource, companyname, compid, content_url, contentid, contentsubstype, crt_cpm, crt_size, cxid, cxsg, dc_yt, deviceid, dos, dossier, educationlevel, floor, fsr, gang, gdpr_applies, gdpr_consentstring, gelegenheid, hb_pb, hb_size, hour, ifa, industry, ingredient, iom, itemid, ix_apnx_om, ix_cdb_om, ix_imdi_cpm, jobtitle, k21, kage, kar, kauth, kembed, keuken, kgender, klg, ko, kpid, kvlg, kvz, kw, lat, long, mc_asset_type, mediation, model, moeilijkheid, origin, pag, pagetype, path, pay, pos, positie, production_id, productname, retailercode, retailername, rpfl_12108, screen, sector, shopid, show_id, show_title, soort, src, stad, starttype, station, subforum, tag, theme, top, video_duration, video_label, yt_vrallowed, ytdevice];; 'Aggregate ['spid[0], cxid#136[0]], ['spid[0] AS domain_userid#276, cxid#136[0] AS key_id#277, min(Time#0) AS first_seen#417, max(Time#0) AS last_seen#419]

我认为问题是因为 spid 列不存在,但是当我检查架构时它存在于数据中。因此,我决定在Databricks上进行更准确的测试。奇怪的是,如果我在运行相同 Spark 版本 3.0.1 的 Databricks 上运行此代码,则每天的数据一切正常,包括在 EMR 上失败的数据。我真的无法解释发生了什么。

最佳答案

我发现我的每日分区也保存在每小时分区中。这些每小时分区的架构可能略有不同,因为有些小时错过了我的 spid 列。因此,在某些日子里,我的生产环境出现故障,并出现上面报告的错误。

该错误很难发现,因为我的测试实例将 option("mergeSchema", "true") 作为默认 Spark 选项,因此它在合并架构时没有遇到错误。

关于apache-spark - 无法解析给定输入列 : Spark-SQL 的 'columnname',我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65505941/

相关文章:

hadoop - pyspark.sql.utils.AnalysisException : u'Path does not exist

scala - 如何找到矩阵中每列的五个第一个最大索引?

scala - 如何将路径列表传递给 spark.read.load?

apache-spark - 从 Eclipse 和 Spark Context 将 Spark 应用程序作为 yarn 作业提交

java - 如何从Python程序创建的文件中读取java程序中的RDD

apache-spark - 如何在 Google Dataproc 中发送失败作业的警报通知?

mysql - 在不知道上限的情况下使用spark并行读取sql数据库

scala - 使用batchsize并写入RDBMS(ORACLE、MsSQL等)时,如何在每个批处理中进行spark提交?

apache-spark - 使用 scala 从 Spark 中的数组数组中的结构中提取值

mongodb - 在 Scala 中从 mongoDB 读取