我有一个查询,我需要使用 pyspark 计算内存利用率。我已经使用 pivot 使用 python pandas 实现了这一点,但现在我需要在 pyspark 中完成它并且旋转将是一个昂贵的功能所以我想知道 pyspark 中是否有任何替代方案用于此解决方案
time_stamp Hostname kpi kpi_subtype value_current
2019/08/17 10:01:05 Server1 memory Total 100
2019/08/17 10:01:06 Server1 memory used 35
2019/08/17 10:01:09 Server1 memory buffer 8
2019/08/17 10:02:04 Server1 memory cached 10
2019/08/17 10:01:05 Server2 memory Total 100
2019/08/17 10:01:06 Server2 memory used 42
2019/08/17 10:01:09 Server2 memory buffer 7
2019/08/17 10:02:04 Server2 memory cached 9
2019/08/17 10:07:05 Server1 memory Total 100
2019/08/17 10:07:06 Server1 memory used 35
2019/08/17 10:07:09 Server1 memory buffer 8
2019/08/17 10:07:04 Server1 memory cached 10
2019/08/17 10:08:05 Server2 memory Total 100
2019/08/17 10:08:06 Server2 memory used 35
2019/08/17 10:08:09 Server2 memory buffer 8
2019/08/17 10:08:04 Server2 memory cached 10
需要转化为
time_stamp Hostname kpi Percentage
2019-08-17 10:05:00 Server1 memory 17
2019-08-17 10:05:00 Server2 memory 26
2019-08-17 10:10:00 Server1 memory 17
2019-08-17 10:10:00 Server2 memory 17
我用的Python代码
df3 = pd.read_csv('/home/yasin/Documents/IMI/Data/memorry sample.csv')
df3['time_stamp'] = pd.to_datetime(df3['time_stamp'])
ns5min=5*60*1000000000
df3['time_stamp'] = pd.to_datetime(((df3['time_stamp'].astype(np.int64) // ns5min + 1 ) * ns5min))
df4 = df3.pivot_table('value_current' , ['time_stamp' , 'Hostname ' , 'kpi' ], 'kpi_subtype')
df4 = df4.reset_index()
df4['Percentage'] = ((df4['Total'] - (df4['Total'] - df4['used'] + df4['buffer'] + df4['cached'])) / df4['Total']) * 100
寻找在 pyspark 中复制它的方法以及在 python 中更有效的方法,因为 pivot 是一项昂贵的操作,我需要每 5 分钟在一个非常大的数据集上执行一次
最佳答案
当转换为列的值列表未知时,透视是昂贵的。 Spark 有一个重载的 pivot
方法,将它们作为参数。
def pivot(pivotColumn: String, values: Seq[Any])
如果不知道它们,Spark 必须对数据集中的不同值进行排序和收集。否则,逻辑非常简单并描述为 here .
The implementation adds a new logical operator (o.a.s.sql.catalyst.plans.logical.Pivot). That logical operator is translated by a new analyzer rule (o.a.s.sql.catalyst.analysis.Analyzer.ResolvePivot) that currently translates it into an aggregation with lots of if statements, one expression per pivot value.
For example, df.groupBy("A", "B").pivot("C", Seq("small", "large")).sum("D") would be translated into the equivalent of df.groupBy("A", "B").agg(expr(“sum(if(C = ‘small’, D, null))”), expr(“sum(if(C = ‘large’, D, null))”)). You could have done this yourself but it would get long and possibly error prone quickly.
如果不旋转,我会做类似的事情:
val in = spark.read.csv("input.csv")
//cast to the unix timestamp
.withColumn("timestamp", unix_timestamp($"time_stamp", "yyyy/MM/dd HH:mm:ss").cast(TimestampType))
.drop($"time_stamp")
现在我们可以使用主机名按时间窗口对数据集进行分组,并将 KPI 指标收集到 map 中。
有一个优秀的answer就是这么描述的。
val joinMap = udf { values: Seq[Map[String, Double]] => values.flatten.toMap }
val grouped = in.groupBy(window($"timestamp", "5 minutes"), $"Hostname")
.agg(joinMap(collect_list(map($"kpi_subtype", $"value_current".cast(DoubleType)))).as("metrics"))
输出
+------------------------------------------+--------+-------------------------------------------------------------+
|window |Hostname|metrics |
+------------------------------------------+--------+-------------------------------------------------------------+
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server1 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server2 |[Total -> 100.0, used -> 42.0, buffer -> 7.0, cached -> 9.0] |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server1 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server2 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
+------------------------------------------+--------+-------------------------------------------------------------+
现在我们定义一些别名和一个简单的选择语句:
val total = col("metrics")("Total")
val used = col("metrics")("used")
val buffer = col("metrics")("buffer")
val cached = col("metrics")("cached")
val result = grouped.select($"window", $"Hostname",
(total - ((total - used + buffer + cached) / total) * 100).as("percentage"))
我们开始吧:
+------------------------------------------+--------+----------+
|window |Hostname|percentage|
+------------------------------------------+--------+----------+
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server1 |17.0 |
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server2 |26.0 |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server1 |17.0 |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server2 |17.0 |
+------------------------------------------+--------+----------+
关于python - 在 pyspark 中不使用 pivot 进行分组的有效方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57541507/