sql - PySpark 子查询 : Accessing outer query column is not allowed

标签 sql apache-spark pyspark apache-spark-sql

我正在尝试将 SQL 查询重写到 PySpark 中。下面是 SQL 查询:

SELECT 
cs.Environment, 
cs.AccountCode,  
MIN(cs.StartDate) AS StartDate,         
MIN(cs.FinalDate) AS FinalDate, 
(
    SELECT TOP 1 ItemCode 
    FROM [dbo].[Contracts] 
    WHERE 
        Environment = cs.Environment 
        AND AccountCode = cs.AccountCode 
        AND ContractType = 'C'      
        AND LinePackage = 1             
        AND InflowOutflow = 'Inflow'    
        AND EventDate <= GETDATE()      
    ORDER BY EventDate 
) AS Package

FROM [dbo].[Contracts] cs
WHERE 
    cs.ContractType = 'C'
    AND cs.LinePackage  = 1
GROUP BY 
    cs.Environment, 
    cs.AccountCode

我的 PySpark 代码是这样的:

df = spark.sql(
    """select cs.environment, cs.accountcode, 
              min(cs.startdatets) as startdate, min(cs.finaldatets) as finaldate,
              (select a.itemcode 
               from firstcomm as a 
               where a.environment = cs.environment and a.accountcode = cs.accountcode and a.contracttype = 'c' and a.eventdate <= current_date() 
               order by a.eventdate limit 1) as package 
        from firstcomm cs where cs.contracttype = 'c' and cs.linepackage  = 1 
        group by cs.environment, cs.accountcode""")

但我不断收到此错误:

AnalysisException: Accessing outer query column is not allowed in:
LocalLimit 1
+- Project [itemcode#3641]
   +- Sort [eventdate#3629 ASC NULLS FIRST], true
      +- Project [itemcode#3641, eventdate#3629]
         +- Filter ((((environment#3628 = outer(environment#3628)) && (accountcode#3622 = outer(accountcode#3622))) && (contracttype#3626 = c)) && (((linepackage#3644 = 1) && (inflowoutflow#3637 = inflow)) && (eventdate#3629 <= current_date(Some(Zulu)))))
            +- SubqueryAlias a

顺便说一下,我使用的是 Spark 2.2.1,我相信它支持子查询

有什么想法可以解决这个问题吗?或者我应该如何重写查询以获得所需的结果?

最佳答案

您是否习惯使用 pyspark dataframe api?

import pyspark.sql.functions as F
from pyspark.sql.window import Window

# get the top item code for each environment/account code
package_df = contracts.filter((F.col("ContractType") == 'C') & (F.col("LinePackage") == 1) & (F.col("InflowOutflow") == "Inflow") & (F.col("EventDate") <= GETDATE())).orderBy("EventDate")

package_df = package_df.withColumn("row_number", F.row_number().over(Window.partitionBy("Environment", "AccountCode").orderBy(F.col("ItemCode").desc())

package_df = package_df.filter(F.col("row_number") == 1).drop("row_number")

# aggregate over your contracts table and join the top item code
contracts_df = contracts.filter((F.col("ContractType") == 'C') & (F.col("AccountCode") == 1))

contracts_df = contracts_df.groupBy("Environment", "AccountCode").agg(F.min("StartDate").alias("StartDate"), F.min("FinalDate").alias("FinalDate"))

# join to get the top item code
output_df = contracts_df.join(package_df, ["Environment", "AccountCode"])

我使用窗口函数来获取每个键的顶部项目代码,然后将其加入到聚合的原始数据框中。

关于sql - PySpark 子查询 : Accessing outer query column is not allowed,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52836080/

相关文章:

python - Pyspark Invalid Input Exception try except 错误

sql - 如何在Oracle Sql Developer中过滤列

sql - 您登录旁边的数字是什么意思

apache-spark - 如何在 PySpark DataFrame 中强制进行特定分区?

apache-spark - 谷歌分析与 Spark

amazon-web-services - 不清楚在 aws cloudformation yaml 模板中的何处添加 --conf spark.jars.packages=org.apache.spark :spark-avro_2. 11 :2. 4.4

apache-spark - “CrossValidatorModel”对象没有属性 'featureImportances'

mysql - 哪种数据库方案在性能方面更好?

sql - SQL Server 2008中ELSE IF语句中是否可以写IN子句

apache-spark - "Bad substitution"将 spark 作业提交到 yarn-cluster 时