python - 在pyspark中创建计数器

标签 python r apache-spark pyspark

如何在 Pyspark 中实现以下 R 代码

l = data.frame(d=c(1,2,4,7,8,15,17,19,20,25,26,29))
l$d2[1]= 0
l$d3[1]=c=1
for(i in 2:nrow(l))
{ l$d2[i]=l$d[i]-l$d[i-1]
  c= ifelse(l$d2[i]<=3,c,c+1)
  l$d3[i]=c
 }
l

我想迭代一列并在值大于或等于 3 时递增计数器。

例如:假设我的列中的元素是

1,2,2,3,2,1,5,2,1

标志应该是: 1,1,1,2,2,2,3,3,3

谢谢

最佳答案

假设以下是输入数据。

输入:

df = spark.createDataFrame([[1,'A',1],[2,'A',2],[3,'A',2],[4,'A',3],[5,'A',2],\
                            [6,'A',5],[7,'B',1],[8,'B',2],[9,'B',5],[10,'B',1]],\
                            ['sl_no','partition','value'])
df.show(10)

input dataframe

  • sl_no - 序列号[基本上是定义数据帧顺序的任何列]
  • 分区 - 如果计数器需要基于现有列进行分区,则对列进行分区
  • - 基于计数器递增的值

输出:

以下代码将为您提供所需的输出。

from pyspark.sql import Window
from pyspark.sql.functions import col, when, sum, lit

threshold= 3

df = df.withColumn("greater",when(col("value")>=lit(threshold),1).otherwise(0))\
       .withColumn("counter",sum("greater").over(Window.partitionBy().orderBy("sl_no")))\
       .withColumn("partitioned_counter",sum("greater").over(Window.partitionBy(["partition"]).orderBy("sl_no")))\
       .orderBy("sl_no")

df.show(10)

output dataframe

  • sl_no - 序列号[基本上是定义数据帧顺序的任何列]
  • 分区 - 如果计数器需要基于现有列进行分区,则对列进行分区
  • - 基于计数器递增的值
  • 更大 - 检查该值是否大于阈值[在本例中为 3]
  • 计数器 - 当值超过阈值时递增的计数器
  • partitioned_counter - 按分区列分区的计数器

如果您只需要根据列的顺序和阈值创建总体计数器,则可以使用上面用于创建计数器列的代码。

如果用例是为一个分区列/一组分区列单独实现计数器,那么您可以使用用于创建partitioned_counter列的代码

关于python - 在pyspark中创建计数器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40466790/

相关文章:

r - 如何计算R中每个单元格中特定数字的个数

python - 如何使用conda安装考拉?

python - 如何可视化 h5 格式数据的图像?

r - 使用cut()时如何处理NA?

apache-spark - 如何根据特定组的计数删除 Spark 数据集中的行

apache-spark - 有没有办法清除齐柏林飞艇的内存?

scala - 使用 Spark 高阶函数时如何返回案例类?

python - Python 中的屏幕抓取

python - QtCore Signal 可以设置为当前类吗?

r - 向 ordiplot3d() 添加点