python - 在 Spark 中应用具有非恒定帧大小的窗口函数

标签 python apache-spark pyspark window-functions

我的问题

我目前在使用 Spark 窗口函数时遇到困难。我正在使用 Spark(通过 pyspark)版本 1.6.3(关联的 Python 版本 2.6.6)。我运行一个 pyspark shell 实例,它自动将 HiveContext 初始化为我的 sqlContext

我想使用window函数进行滚动求和。我的问题是窗框不是固定的:这取决于我们考虑的观察结果。更具体地说,我通过名为 rank_id 的变量对数据进行排序,并希望对索引 $x+1$ 和 $2x-1$ 之间索引为 $x$ 的任何观察值进行滚动求和。因此,我的 rangeBetween 必须取决于 rank_id 变量值。

重要的一点是,我不想收集数据,因此不能使用 numpy 之类的东西(我的数据有很多观察结果)。

可重现的示例

from pyspark.mllib.random import RandomRDDs
import pyspark.sql.functions as psf
from pyspark.sql.window import Window

# Reproducible example
data = RandomRDDs.uniformVectorRDD(sc, 15, 2)
df = data.map(lambda l: (float(l[0]), float(l[1]))).toDF()
df = df.selectExpr("_1 as x", "_2 as y")

#df.show(2)
#+-------------------+------------------+                                        
#|                  x|                 y|
#+-------------------+------------------+
#|0.32767742062486405|0.2506351566289311|
#| 0.7245348534550357| 0.597929853274274|
#+-------------------+------------------+
#only showing top 2 rows

# Finalize dataframe creation
w = Window().orderBy("x")
df = df.withColumn("rank_id", psf.rowNumber().over(w)).sort("rank_id")
#df.show(3)
#+--------------------+--------------------+-------+                             
#|                   x|                   y|rank_id|
#+--------------------+--------------------+-------+
#|0.016536160706045577|0.009892450530381458|      1|
#| 0.10943843181953838|  0.6478505849227775|      2|
#| 0.13916818312857027| 0.24165348228464578|      3|
#+--------------------+--------------------+-------+
#only showing top 3 rows

固定宽度累加和:没问题

使用window函数,我能够对给定数量的索引运行累积和(我在这里使用rangeBetween,但对于本例rowBetween code> 可以无差别地使用)。

w = Window.orderBy('rank_id').rangeBetween(-1,3)
df1 = df.select('*', psf.sum(df['y']).over(w).alias('roll1'))
#df1.show(3)
#+--------------------+--------------------+-------+------------------+          
#|                   x|                   y|rank_id|             roll1|
#+--------------------+--------------------+-------+------------------+
#|0.016536160706045577|0.009892450530381458|      1|0.9698521852602887|
#| 0.10943843181953838|  0.6478505849227775|      2|1.5744700156326066|
#| 0.13916818312857027| 0.24165348228464578|      3|2.3040547273760392|
#+--------------------+--------------------+-------+------------------+
#only showing top 3 rows

累计宽度不固定

我想要在索引 x+12x-1 之间求和,其中 x 是我的行索引。当我尝试将其传递给 Spark 时(与我们对 orderBy 所做的类似,也许这就是问题所在),我收到以下错误

# Now if I want to make rangeBetween size depend on a variable
w = Window.orderBy('rank_id').rangeBetween('rank_id'+1,2*'rank_id'-1)

Traceback (most recent call last): File "", line 1, in TypeError: cannot concatenate 'str' and 'int' objects

我尝试了其他方法,使用 SQL 语句

# Using SQL expression
df.registerTempTable('tempdf')
df2 = sqlContext.sql("""
   SELECT *, SUM(y)
   OVER (ORDER BY rank_id
   RANGE BETWEEN rank_id+1 AND 2*rank_id-1) AS cumsum
   FROM tempdf;
""")

这一次给了我以下错误

Traceback (most recent call last): File "", line 6, in File "/opt/application/Spark/current/python/pyspark/sql/context.py", line >580, in sql return DataFrame(self._ssql_ctx.sql(sqlQuery), self) File "/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call File "/opt/application/Spark/current/python/pyspark/sql/utils.py", line 51, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u"cannot recognize input near 'rank_id' '+' '1' in windowframeboundary; line 3 pos 15"

我还注意到,当我使用 SQL OVER 子句尝试更简单的语句时,我收到了类似的错误,这可能意味着我没有将 SQL 语句正确传递给 Spark

df2 = sqlContext.sql("""
   SELECT *, SUM(y)
   OVER (ORDER BY rank_id
   RANGE BETWEEN -1 AND 1) AS cumsum
   FROM tempdf;
 """)

Traceback (most recent call last): File "", line 6, in File "/opt/application/Spark/current/python/pyspark/sql/context.py", line 580, in sql return DataFrame(self._ssql_ctx.sql(sqlQuery), self) File "/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call File "/opt/application/Spark/current/python/pyspark/sql/utils.py", line 51, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u"cannot recognize input near '-' '1' 'AND' in windowframeboundary; line 3 pos 15"

如何在 Spark 中使用 windowSQL 语句来解决我的问题?

最佳答案

How could I solve my problem by using either window or SQL statement within Spark?

TL;DR 您无法满足当前的需求,或者至少无法以可扩展的方式满足当前的需求。您可以尝试类似于在 RDD 上滑动的操作:How to transform data with sliding window over time series data in Pyspark

I also noticed that when I try a more simple statement using SQL OVER clause, I got a similar error which maybe means I am not passing SQL statement correctly to Spark

这是不正确的。范围规范需要 ( PRECEDING | FOLLOWING | CURRENT_ROW ) 规范。另外也不应该有分号:

SELECT *, SUM(x)
OVER (ORDER BY rank_id
RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS cumsum
FROM tempdf

I want to sum between indexes x+1 and 2x-1 where x is my row index. When I try to pass it to Spark (in similar way we do for orderBy maybe that's the problem), I got the following error ...

TypeError: cannot concatenate 'str' and 'int' objects

正如异常所说 - 你不能调用 +关于字符串和整数。您可能想要列:

from pyspark.sql.functions import col

.rangeBetween(col('rank_id') + 1,  2* col('rank_id') - 1)

但这不受支持。范围必须具有固定大小,并且不能用表达式来定义。

An important point is that I don't want to collect data

没有 partitionBy 的窗口定义:

w = Window.orderBy('rank_id').rangeBetween(-1,3)

和收集一样糟糕。因此,即使有“动态框架”(带有条件和无界窗口)问题的解决方法,它们也不会在这里帮助您。

关于python - 在 Spark 中应用具有非恒定帧大小的窗口函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48185535/

相关文章:

python - 使用 RecycleView 时设置列宽

Python sqlite3 参数化掉表

apache-spark - Kafka Producer-找不到org.apache.kafka.common.serialization.StringSerializer

python - 并非所有排列都包括[itertools,排列]

javascript - 无法使用 NodeJS 上传音频文件

apache-spark - spark redis 键列映射不起作用 - 返回 null

apache-spark - 在 solr v6+ 中替代 org.apache.solr.client.solrj.util.ClientUtils.toSolrInputDocument() 方法?

apache-spark - 如果有很多字段,如何让 pyspark 显示整个查询计划而不是...?

apache-spark - 迭代配对 RDD (Pyspark) 的值并替换空值

azure - 如何使用 Azure Synapse 删除 Databricks 上的表或行?