如何在 Pyspark 中实现以下 R 代码
l = data.frame(d=c(1,2,4,7,8,15,17,19,20,25,26,29))
l$d2[1]= 0
l$d3[1]=c=1
for(i in 2:nrow(l))
{ l$d2[i]=l$d[i]-l$d[i-1]
c= ifelse(l$d2[i]<=3,c,c+1)
l$d3[i]=c
}
l
我想迭代一列并在值大于或等于 3 时递增计数器。
例如:假设我的列中的元素是
1,2,2,3,2,1,5,2,1
标志应该是: 1,1,1,2,2,2,3,3,3
谢谢
最佳答案
假设以下是输入数据。
输入:
df = spark.createDataFrame([[1,'A',1],[2,'A',2],[3,'A',2],[4,'A',3],[5,'A',2],\
[6,'A',5],[7,'B',1],[8,'B',2],[9,'B',5],[10,'B',1]],\
['sl_no','partition','value'])
df.show(10)
- sl_no - 序列号[基本上是定义数据帧顺序的任何列]
- 分区 - 如果计数器需要基于现有列进行分区,则对列进行分区
- 值 - 基于计数器递增的值
输出:
以下代码将为您提供所需的输出。
from pyspark.sql import Window
from pyspark.sql.functions import col, when, sum, lit
threshold= 3
df = df.withColumn("greater",when(col("value")>=lit(threshold),1).otherwise(0))\
.withColumn("counter",sum("greater").over(Window.partitionBy().orderBy("sl_no")))\
.withColumn("partitioned_counter",sum("greater").over(Window.partitionBy(["partition"]).orderBy("sl_no")))\
.orderBy("sl_no")
df.show(10)
- sl_no - 序列号[基本上是定义数据帧顺序的任何列]
- 分区 - 如果计数器需要基于现有列进行分区,则对列进行分区
- 值 - 基于计数器递增的值
- 更大 - 检查该值是否大于阈值[在本例中为 3]
- 计数器 - 当值超过阈值时递增的计数器
- partitioned_counter - 按分区列分区的计数器
如果您只需要根据列的顺序和阈值创建总体计数器,则可以使用上面用于创建计数器列的代码。
如果用例是为一个分区列/一组分区列单独实现计数器,那么您可以使用用于创建partitioned_counter列的代码
关于python - 在pyspark中创建计数器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40466790/