python - 在 pyspark 中不使用 pivot 进行分组的有效方法

标签 python apache-spark pyspark apache-spark-sql

我有一个查询,我需要使用 pyspark 计算内存利用率。我已经使用 pivot 使用 python pandas 实现了这一点,但现在我需要在 pyspark 中完成它并且旋转将是一个昂贵的功能所以我想知道 pyspark 中是否有任何替代方案用于此解决方案

time_stamp          Hostname    kpi kpi_subtype value_current
2019/08/17 10:01:05 Server1     memory  Total       100
2019/08/17 10:01:06 Server1     memory  used        35
2019/08/17 10:01:09 Server1     memory  buffer      8
2019/08/17 10:02:04 Server1     memory  cached      10
2019/08/17 10:01:05 Server2     memory  Total       100
2019/08/17 10:01:06 Server2     memory  used        42
2019/08/17 10:01:09 Server2     memory  buffer      7
2019/08/17 10:02:04 Server2     memory  cached      9
2019/08/17 10:07:05 Server1     memory  Total       100
2019/08/17 10:07:06 Server1     memory  used        35
2019/08/17 10:07:09 Server1     memory  buffer      8
2019/08/17 10:07:04 Server1     memory  cached      10
2019/08/17 10:08:05 Server2     memory  Total       100
2019/08/17 10:08:06 Server2     memory  used        35
2019/08/17 10:08:09 Server2     memory  buffer      8
2019/08/17 10:08:04 Server2     memory  cached      10

需要转化为

time_stamp      Hostname    kpi Percentage
2019-08-17 10:05:00 Server1     memory  17
2019-08-17 10:05:00 Server2     memory  26
2019-08-17 10:10:00 Server1     memory  17
2019-08-17 10:10:00 Server2     memory  17

我用的Python代码

df3 = pd.read_csv('/home/yasin/Documents/IMI/Data/memorry sample.csv')
df3['time_stamp'] = pd.to_datetime(df3['time_stamp'])
ns5min=5*60*1000000000 
df3['time_stamp'] = pd.to_datetime(((df3['time_stamp'].astype(np.int64) // ns5min + 1 ) * ns5min))
df4 = df3.pivot_table('value_current' , ['time_stamp' , 'Hostname ' , 'kpi' ], 'kpi_subtype')
df4 = df4.reset_index()
df4['Percentage'] = ((df4['Total'] - (df4['Total'] - df4['used'] + df4['buffer'] + df4['cached'])) / df4['Total']) * 100

寻找在 pyspark 中复制它的方法以及在 python 中更有效的方法,因为 pivot 是一项昂贵的操作,我需要每 5 分钟在一个非常大的数据集上执行一次

最佳答案

当转换为列的值列表未知时,透视是昂贵的。 Spark 有一个重载的 pivot 方法,将它们作为参数。

def pivot(pivotColumn: String, values: Seq[Any])

如果不知道它们,Spark 必须对数据集中的不同值进行排序和收集。否则,逻辑非常简单并描述为 here .

The implementation adds a new logical operator (o.a.s.sql.catalyst.plans.logical.Pivot). That logical operator is translated by a new analyzer rule (o.a.s.sql.catalyst.analysis.Analyzer.ResolvePivot) that currently translates it into an aggregation with lots of if statements, one expression per pivot value.

For example, df.groupBy("A", "B").pivot("C", Seq("small", "large")).sum("D") would be translated into the equivalent of df.groupBy("A", "B").agg(expr(“sum(if(C = ‘small’, D, null))”), expr(“sum(if(C = ‘large’, D, null))”)). You could have done this yourself but it would get long and possibly error prone quickly.

如果不旋转,我会做类似的事情:

val in = spark.read.csv("input.csv")
      //cast to the unix timestamp
      .withColumn("timestamp", unix_timestamp($"time_stamp", "yyyy/MM/dd HH:mm:ss").cast(TimestampType))
      .drop($"time_stamp")

现在我们可以使用主机名按时间窗口对数据集进行分组,并将 KPI 指标收集到 map 中。
有一个优秀的answer就是这么描述的。

val joinMap = udf { values: Seq[Map[String, Double]] => values.flatten.toMap }

val grouped = in.groupBy(window($"timestamp", "5 minutes"), $"Hostname")
  .agg(joinMap(collect_list(map($"kpi_subtype", $"value_current".cast(DoubleType)))).as("metrics"))

输出

+------------------------------------------+--------+-------------------------------------------------------------+
|window                                    |Hostname|metrics                                                      |
+------------------------------------------+--------+-------------------------------------------------------------+
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server1 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server2 |[Total -> 100.0, used -> 42.0, buffer -> 7.0, cached -> 9.0] |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server1 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server2 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
+------------------------------------------+--------+-------------------------------------------------------------+

现在我们定义一些别名和一个简单的选择语句:

val total = col("metrics")("Total")
val used = col("metrics")("used")
val buffer = col("metrics")("buffer")
val cached = col("metrics")("cached")

val result = grouped.select($"window", $"Hostname",
          (total - ((total - used + buffer + cached) / total) * 100).as("percentage"))

我们开始吧:

+------------------------------------------+--------+----------+
|window                                    |Hostname|percentage|
+------------------------------------------+--------+----------+
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server1 |17.0      |
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server2 |26.0      |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server1 |17.0      |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server2 |17.0      |
+------------------------------------------+--------+----------+

关于python - 在 pyspark 中不使用 pivot 进行分组的有效方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57541507/

相关文章:

apache-spark - 加载表时从 phoenix 过滤

python - 如何在 SSH 关闭的情况下在 MySQL 中存储来自 Twitter Streaming API 的推文

python - 如何打乱列表,以便每个特定长度的子列表都有唯一的项目?

apache-spark - pyspark addPyFile 添加 .py 文件的 zip,但仍未找到模块

hadoop - Spark 写入 hdfs 不使用 saveAsNewAPIHadoopFile 方法

python - 加入 3 个数据框时遇到问题 - pyspark

python - 在python中将txt文件读取到不带换行符的列表中

python - OpenCV 和 Tesseract 进行门标签检测

apache-spark - 尝试保存 Pyspark Dataframe,但出现 Py4JNetworkError - UBUNTU

scala - 如何在 Scala Spark 项目中使用 PySpark UDF?