我在 pyspark 中有这个数据框:
df = spark.createDataFrame([
("TenantId", "TennatId_1"),
("TimeGenerated", "2023-04-17T11:50:51.9013145Z"),
("ActivityType", "Connection"),
("CorrelationId", "608dd49a"),
("UserName", "<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="94e0f1e7e0cba5d4e0f1e7e0baf7f8fbe1f0" rel="noreferrer noopener nofollow">[email protected]</a>"),
("Name", "Name1"),
("Source", "Client"),
("Parameters", "{}"),
("SourceSystem", "Azure"),
("Type", "Check"),
("_ResourceId", "/subscriptions/5286ce"),
("TenantId", "TennatId_2"),
("TimeGenerated", "2023-04-17T11:50:51.944022Z"),
("ActivityType", "Connection"),
("CorrelationId", "11c0d75f0000"),
("UserName", "<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="6a1e0f191e35582a1e0f191e440906051f0e" rel="noreferrer noopener nofollow">[email protected]</a>"),
("Name", "Name2"),
("Source", "Client"),
("Parameters", "{}"),
("SourceSystem", "Azure"),
("Type", "Check"),
("_ResourceId", "/subscriptions/5286ce38-272f-4c54")], ["name", "rows"])
我想以此为基础。
我尝试过以下表达式:
pivoted_df = df.groupBy("name") \
.pivot("name") \
.agg(expr("first(rows) as rows")) \
.orderBy("name")
但是我得到了这个输出:
我的期望输出是:
如何做到这一点?
最佳答案
在名称上使用groupBy、Pivot
函数并在行上进行聚合以获得所需的输出。
示例:
from pyspark.sql.functions import *
df.groupBy(lit(1)).pivot("name").agg(first(col("rows"))).drop("1").show(10,False)
#+------------+-------------+-----+----------+------+------------+----------+----------------------------+-----+-----------------+---------------------+
#|ActivityType|CorrelationId|Name |Parameters|Source|SourceSystem|TenantId |TimeGenerated |Type |UserName |_ResourceId |
#+------------+-------------+-----+----------+------+------------+----------+----------------------------+-----+-----------------+---------------------+
#|Connection |608dd49a |Name1|{} |Client|Azure |TennatId_1|2023-04-17T11:50:51.9013145Z|Check|<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="fb8f9e888fa4cabb8f9e888fd59897948e9f" rel="noreferrer noopener nofollow">[email protected]</a>|/subscriptions/5286ce|
#+------------+-------------+-----+----------+------+------------+----------+----------------------------+-----+-----------------+---------------------+
更新:
#define window
w=Window.partitionBy(lit("1")).orderBy("mid")
#add order id column and temporary window partition
df1 = df.withColumn("mid",monotonically_increasing_id()).\
withColumn("temp_win", when(col("rows").rlike("^TennatId"),lit(1)).otherwise(lit(0))).\
withColumn("windw", sum(col("temp_win")).over(w))
#pivot and window
df1.groupBy("windw").pivot("name").agg(first(col("rows"))).drop("windw").show(10,False)
#+------------+-------------+-----+----------+------+------------+----------+----------------------------+-----+-----------------+---------------------------------+
|ActivityType|CorrelationId|Name |Parameters|Source|SourceSystem|TenantId |TimeGenerated |Type |UserName |_ResourceId |
+------------+-------------+-----+----------+------+------------+----------+----------------------------+-----+-----------------+---------------------------------+
#|Connection |608dd49a |Name1|{} |Client|Azure |TennatId_1|2023-04-17T11:50:51.9013145Z|Check|<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="9de9f8eee9c2acdde9f8eee9b3fef1f2e8f9" rel="noreferrer noopener nofollow">[email protected]</a>|/subscriptions/5286ce |
#|Connection |11c0d75f0000 |Name2|{} |Client|Azure |TennatId_2|2023-04-17T11:50:51.944022Z |Check|<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="10647563644f2250647563643e737c7f6574" rel="noreferrer noopener nofollow">[email protected]</a>|/subscriptions/5286ce38-272f-4c54|
#+------------+-------------+-----+----------+------+------------+----------+----------------------------+-----+-----------------+---------------------------------+
关于azure - Pyspark - 数据透视表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76084300/