python - Spark : How to split an instance into Postivie/Negative Samples according to two columns

标签 python apache-spark pyspark kaggle

我有以下数据框:

df = sc.parallelize([(1, 2, 3, '2','1','1'), (4, 5, 6, '3','2','1')]).toDF(['ID1', 'ID2', 'ID3','Impressions','Clicks','ImpressionsMinusClicks'])
df.show()

enter image description here

我想将其转换为这个(但是不知道如何以及是否应用 split()explode() 来实现这一点):

enter image description here

这里的关键是基本上复制每个实例以匹配印象数(例如,10 个印象实例变成 10 行),然后在这些行中将它们标记为 # 点击次数作为正例,其余行标记为 # IMpressions -点击次数作为反面例子。总结一下:一个实例有 10 次展示和 3 次点击。我想将其转换为 10 行,3 个正样本(“1”表示点击)和 7 个负样本(“0”表示印象深刻/未点击)。目的是使用它作为分类模型的输入,例如朴素贝叶斯或逻辑回归。其起源是 Kaggle KDD Cup 2012 数据集。

最佳答案

您确实可以在 UDF 的结果上使用 explode 来生成一系列“事件” - 1 表示点击事件,0 表示未点击的印象事件:

// We create a UDF which expects two columns (imps and clicks) as input, 
// and returns an array of "is clicked" (0 or 1) integers
val toClickedEvents = udf[Array[Int], Int, Int] {
  case (imps, clicks) => {
    // First, we map the number of imps (e.g. 3) into a sequence
    // of "imps" indices starting from zero; Each one would later
    // represent a single impression "event"
    val impsIndices = (0 until imps)

    // we map each impression "event", represented by its index, 
    // into a 1 or a 0: depending if that event had a matching click;
    // we do that by assigning "1" to indices lower than the number of clicks
    // and "0" for the rest
    val clickIndicatorPerImp = impsIndices.map(index => if (clicks > index) 1 else 0)

    // finally we just convert into an array, to comply with the UDF signature
    clickIndicatorPerImp.toArray
  }
}

// explode the result of the UDF and calculate ImpressedNotClicked
df.withColumn("Clicked", explode(toClickedEvents($"Impressions", $"Clicks")))
  .select($"ID1", $"ID2", $"ID3", $"Clicked", abs($"Clicked" - lit(1)) as "ImpressedNotClicked")

注意:原始帖子已标记为 scala;如果您可以将其转换为 python,请随意编辑

关于python - Spark : How to split an instance into Postivie/Negative Samples according to two columns,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43685580/

相关文章:

python - PySpark - Spark DataFrame 数组与 Python 列表不同吗?

apache-spark - Spark增量加载覆盖旧记录

python - PySpark Dataframe.groupBy MapType 列

python - 在Python中模拟alt+tab

python - 使用 setup.py 安装 .desktop 文件

python - 如何绘制多列的条形图 3D 投影

python - 来自 Spark 的 Parquet 文件被检测为 Linux 中的目录

python - 使用 Python 格式化 HTML 到 JSON

apache-spark - Apache Beam Spark/flink运行程序未在EMR中执行(从GCS访问文件)

pyspark - 在 pyspark 中聚合 One-Hot 编码功能