python - 根据用户 ID PySpark 在训练测试中分割行

标签 python apache-spark pyspark train-test-split

我有一个 PySpark 数据框,其中每个用户包含多行:

<表类=“s-表”> <标题> 用户ID Action 时间 <正文> 1 购买 上午 8 点 1 购买 上午 9 点 1 出售 下午2点 1 出售 下午 3 点 2 出售 上午 10 点 2 购买 上午 11 点 2 出售 下午2点 2 出售 下午 3 点

我的目标是将这个数据集分成训练集和测试集,对于每个 userId ,N % 的行在训练集中,100-N % 的行在测试集中。例如,给定 N=75%,训练集将为

<表类=“s-表”> <标题> 用户ID Action 时间 <正文> 1 购买 上午 8 点 1 购买 上午 9 点 1 出售 下午2点 2 出售 上午 10 点 2 购买 上午 11 点 2 出售 下午2点

测试集将是

<表类=“s-表”> <标题> 用户ID Action 时间 <正文> 1 出售 下午 3 点 2 出售 下午 3 点

有什么建议吗?行是根据列时间排序的,我不认为 Spark 的 RandomSplit可能会有所帮助,因为我无法对特定列进行分层

最佳答案

我们有类似的需求并通过以下方式解决:

data = [
  (1, "buy"),
  (1, "buy"),
  (1, "sell"),
  (1, "sell"),
  (2, "sell"),
  (2, "buy"),
  (2, "sell"),
  (2, "sell"),
]

df = spark.createDataFrame(data, ["userId", "action"])

使用Window功能创建序列行号。还按每个 userId 计算记录数。这将有助于计算要过滤的记录的百分比。

from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number
window = Window.partitionBy(df["userId"]).orderBy(df["userId"])
df_count = df.groupBy("userId").count().withColumnRenamed("userId", "userId_grp")
df = df.join(df_count, col("userId") == col("userId_grp"), "left").drop("userId_grp")
df = df.select("userId", "action", "count", row_number().over(window).alias("row_number"))

df.show()
+------+------+-----+----------+
|userId|action|count|row_number|
+------+------+-----+----------+
|     1|   buy|    4|         1|
|     1|   buy|    4|         2|
|     1|  sell|    4|         3|
|     1|  sell|    4|         4|
|     2|  sell|    4|         1|
|     2|   buy|    4|         2|
|     2|  sell|    4|         3|
|     2|  sell|    4|         4|
+------+------+-----+----------+

按所需百分比过滤培训记录:

n = 75
df_train = df.filter(col("row_number") <= col("count") * n / 100)
df_train.show()
+------+------+-----+----------+
|userId|action|count|row_number|
+------+------+-----+----------+
|     1|   buy|    4|         1|
|     1|   buy|    4|         2|
|     1|  sell|    4|         3|
|     2|  sell|    4|         1|
|     2|   buy|    4|         2|
|     2|  sell|    4|         3|
+------+------+-----+----------+

剩余的记录进入测试集:

df_test = df.alias("df").join(df_train.alias("tr"), (col("df.userId") == col("tr.userId")) & (col("df.row_number") == col("tr.row_number")), "leftanti")
df_test.show()
+------+------+-----+----------+
|userId|action|count|row_number|
+------+------+-----+----------+
|     1|  sell|    4|         4|
|     2|  sell|    4|         4|
+------+------+-----+----------+

关于python - 根据用户 ID PySpark 在训练测试中分割行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73632648/

相关文章:

hadoop - Cloudera Manager Yarn 和 Spark UI 不工作

python - 接收未知列数的 Spark UDF

python - 我可以在本地计算机上使用Apache Spark处理100 GB的数据吗?

python: <type 'exceptions.UnicodeEncodeError' > 为什么会这样?

python - 同时更新多个标签时 Tkinter 没有响应

android - 使用 Anaconda Python 3.6 在 Windows 7 上安装 Kivy

python - 提高spark sql的并行性

python - Runge-Kutta 代码不与内置方法收敛

apache-spark - Spark-Cassandra 与 Spark-Elasticsearch

apache-spark - Pyspark 仅使用 sudo 运行