pyspark - 在 Spark SQL 中为每一分钟的差异创建一个新行

标签 pyspark apache-spark-sql

考虑我的数据:

+---+-------------------+-------------------+
| id|          starttime|            endtime|
+---+-------------------+-------------------+
|  1|1970-01-01 07:00:00|1970-01-01 07:03:00|
+---+-------------------+-------------------+

基于此,我想要一个 sql 查询,它为结束时间和开始时间之间的每一分钟差异创建一行,使我的数据完全像这样结束:

+---+-------------------+-------------------+
| id|          starttime|            endtime|
+---+-------------------+-------------------+
|  1|1970-01-01 07:00:00|1970-01-01 07:03:00|
+---+-------------------+-------------------+
|  1|1970-01-01 07:01:00|1970-01-01 07:03:00|
+---+-------------------+-------------------+
|  1|1970-01-01 07:02:00|1970-01-01 07:03:00|
+---+-------------------+-------------------+
|  1|1970-01-01 07:03:00|1970-01-01 07:03:00|
+---+-------------------+-------------------+

我对sql有很强的偏好,但如果不可能,你可以使用pyspark。

最佳答案

试试这个:

import pyspark.sql.functions as f
df.show()
+---+-------------------+-------------------+
| id|          starttime|            endtime|
+---+-------------------+-------------------+
|  1|1970-01-01 07:00:00|1970-01-01 07:03:00|
+---+-------------------+-------------------+

#df.printSchema()
# root
# |-- id: long (nullable = true)
# |-- starttime: timestamp (nullable = true)
# |-- endtime: timestamp (nullable = true)

exprsequence以一分钟的间隔组合起来,将为您提供分钟的时间戳数组,然后将其分解以按行进行转换。

df.select('id', f.explode(f.expr('sequence(starttime, endtime, interval 1 minute)')).alias('starttime'), 'endtime' ).show(truncate=False)
+---+-------------------+-------------------+
|id |starttime          |endtime            |
+---+-------------------+-------------------+
|1  |1970-01-01 07:00:00|1970-01-01 07:03:00|
|1  |1970-01-01 07:01:00|1970-01-01 07:03:00|
|1  |1970-01-01 07:02:00|1970-01-01 07:03:00|
|1  |1970-01-01 07:03:00|1970-01-01 07:03:00|
+---+-------------------+-------------------+

关于pyspark - 在 Spark SQL 中为每一分钟的差异创建一个新行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60170879/

相关文章:

azure - 如何在 pyspark 中使用 azure-sqldb-spark 连接器

python - 只有单线程使用多处理池执行与PySpark的并行SQL查询

scala - 尝试保存 Spark SQL Dataframes 总是导致空目录

apache-spark - 如何通过 Spark 属性(Spark 1.6)在 spark-shell 中启用或禁用 Hive 支持?

amazon-web-services - 如何在现有的Apache Spark独立群集上安装Apache Zeppelin

azure - Azure Blob 存储和 Azure databricks 之间的高效数据检索过程

sql - 在 HIVE SQL 中使用 cte 时出错 - java.lang

scala - 如何更改SparkContext.sparkUser()设置(在pyspark中)?

java - Dataframe 未保存到 Cassandra 中

apache-spark - 为什么在 DataFrame 上使用 union()/coalesce(1,false) 时会在 Spark 中混洗大量数据?