python - pyspark,比较数据框中的两行

标签 python apache-spark pyspark apache-spark-sql

我正在尝试将数据框中的一行与下一行进行比较,以查看时间戳的差异。目前数据如下:

 itemid | eventid | timestamp
 ----------------------------
 134    | 30      | 2016-07-02 12:01:40
 134    | 32      | 2016-07-02 12:21:23
 125    | 30      | 2016-07-02 13:22:56
 125    | 32      | 2016-07-02 13:27:07

我已经尝试将一个函数映射到数据帧上以允许像这样进行比较:(注意:我正在尝试获取差异大于 4 小时的行)

items = df.limit(10)\
          .orderBy('itemid', desc('stamp'))\
          .map(lambda x,y: (x.stamp - y.stamp) > 14400).collect()

但我收到以下错误:

Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe

我认为这是由于我错误地使用了 map 功能。帮助使用 map ,或其他解决方案将不胜感激。

更新: @zero323 的回答说明了我对映射的不当使用,但是我使用的系统运行的是 2.02 之前的 Spark 版本,并且我正在使用 Cassandra 中的数据。

我设法用 mapPartitions 解决了这个问题。请参阅下面我的回答。

更新(2017/03/27): 自从最初在这篇文章上标记答案以来,我对 Spark 的理解有了显着提高。我在下面更新了我的答案以显示我当前的解决方案。

最佳答案

是的,您以错误的方式使用了 map 函数。 map 当时对单个元素进行操作。您可以尝试使用这样的窗口函数:

from pyspark.sql.functions import col, lag
from pyspark.sql.window import Window

df = (
    sc.parallelize([
        (134, 30, "2016-07-02 12:01:40"), (134, 32, "2016-07-02 12:21:23"),
        (125, 30, "2016-07-02 13:22:56"), (125, 32, "2016-07-02 13:27:07"),
    ]).toDF(["itemid", "eventid", "timestamp"])
    .withColumn("timestamp", col("timestamp").cast("timestamp"))
)

w = Window.partitionBy("itemid").orderBy("timestamp")

diff = col("timestamp").cast("long") - lag("timestamp", 1).over(w).cast("long")

df.withColumn("diff", diff)

关于python - pyspark,比较数据框中的两行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38229659/

相关文章:

apache-spark - 如何在 PySpark 中运行脚本

apache-spark - 如何分析 pyspark 工作

java - 如何在 Cassandra 中使用 Spark 获取行范围

python - F.monotonicly_increasing_id() 返回长随机数

javascript - Ajax 请求不返回 HttpResponse

python - 是否可以从理解内部将结果编译成唯一列表?

apache-spark - 使用 Spark 解码一组二进制文件

apache-spark - Spark /Pyspark : SVM - How to get Area-under-curve?

python - 如何覆盖 Flask-Security 默认消息?

python - 以通用方式为 Python 中的所有子类实现 __neg__