json - 在 Spark DataFrame 中布局 TimeSeries 数据的最佳方式 - Scala

标签 json scala apache-spark time-series apache-spark-sql

我正在从美联储经济数据集 API 导入数据。每个请求都会返回每日、每周、每月或每年的时间序列。我的最终目标是进行变量选择并构建一个基于贝叶斯的模型,该模型使用选定的时间序列作为特定时间序列的预测变量。将这些数据构建为数据框架的最佳方法是什么?

根据这个documentation我认为我的数据应该放在“即时”format中。然而,在尝试加入超过 200,000 个列后,我为此所做的尝试最终都变得异常缓慢。下面文档引用中的另一种格式是“TimeSeriesRDD”,但导入的时间序列通常没有日期重叠,范围从 1930 年至今。那么,将这些数据构建为数据框架的最佳方法是什么?

如果有一个如何将 FRED 中的数据加载为您推荐的格式的示例,我们将不胜感激!

这是我的第一个方法,速度非常慢

for (seriesId <- allSeries) {
    val series = loadSeriesFromAPI(seriesId, spark)
    allSeries = allSeries.join(series, allSeries.col("date") === series.col(seriesId + "_date"), "outer")
    allSeries = allSeries.drop(seriesId + "_date")
}

第二个是我必须一次加载 1 列和 1 行数据

for(row <- series) {
  val insertStr = "%s, %g".
    format(
      row.asInstanceOf[Map[String, Date]]("date").asInstanceOf[String],
      parseDoubleOrZero(row.asInstanceOf[Map[String, Double]]("value").asInstanceOf[String])
    )
}

最佳答案

拥有一个包含 200.000 列的 DataFrame 并不是一个好主意。我建议的一件事是将问题分开一点,不要混合太多技术:

  1. 数据摄取:您的系列实际上有多大?尽可能避免连接(连接意味着洗牌,洗牌意味着网络,这将使一切变慢)。如果适合的话,我会使用 Scala 收集数据并将其保存在内存中,如果不适合,我仍然会在 Scala 中收集批量的系列,并将每个批处理转换为 Spark DataFrame。
  2. 数据帧创建:如果您设法将数据放入内存中,那么您可以尝试使用以下代码片段来创建数据帧:
case class Point(timestamp: Long, value: Long)
case class Series(id: String, points: List[Point])

val s1 = Series("s1", List(Point(1, 100), Point(2, 200), Point(3, 100)))
val s2 = Series("s2", List(Point(1, 1000), Point(3, 100)))

val seriesDF = sc.parallelize(Array(s1, s2)).toDF
seriesDF.show()

seriesDF.select($"id", explode($"points").as("point"))
    .select($"id", $"point.timestamp", $"point.value")
    .show()

输出:

+---+--------------------+
| id|              points|
+---+--------------------+
| s1|[[1,100], [2,200]...|
| s2| [[1,1000], [3,100]]|
+---+--------------------+
+---+---------+-----+
| id|timestamp|value|
+---+---------+-----+
| s1|        1|  100|
| s1|        2|  200|
| s1|        3|  100|
| s2|        1| 1000|
| s2|        3|  100|
+---+---------+-----+

对于处理时间序列的更奇特的方式,我推荐以下项目:https://github.com/twosigma/flint

关于json - 在 Spark DataFrame 中布局 TimeSeries 数据的最佳方式 - Scala,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41819417/

相关文章:

dataframe - 如何按列值在pyspark df中添加更多行

python - 两个 pyspark 数据帧的余弦相似度

java Linux服务器: class not found exception though jar in classpath

json - SwiftyJSON实现json字符串到swift对象

java - Jython:解析JSON对象以获取值(对象有数组)

scala - SBT - 姓名和身份证有什么区别?

json - 检查 WP REST API 的 JWT 身份验证中的 JSON 对象是否为 null

scala - 如何以负步长迭代范围?

scala - 用于包装不纯方法的效果?

scala - 在Spark中读取压缩的xml文件