我正在从美联储经济数据集 API 导入数据。每个请求都会返回每日、每周、每月或每年的时间序列。我的最终目标是进行变量选择并构建一个基于贝叶斯的模型,该模型使用选定的时间序列作为特定时间序列的预测变量。将这些数据构建为数据框架的最佳方法是什么?
根据这个documentation我认为我的数据应该放在“即时”format中。然而,在尝试加入超过 200,000 个列后,我为此所做的尝试最终都变得异常缓慢。下面文档引用中的另一种格式是“TimeSeriesRDD”,但导入的时间序列通常没有日期重叠,范围从 1930 年至今。那么,将这些数据构建为数据框架的最佳方法是什么?
如果有一个如何将 FRED 中的数据加载为您推荐的格式的示例,我们将不胜感激!
这是我的第一个方法,速度非常慢
for (seriesId <- allSeries) {
val series = loadSeriesFromAPI(seriesId, spark)
allSeries = allSeries.join(series, allSeries.col("date") === series.col(seriesId + "_date"), "outer")
allSeries = allSeries.drop(seriesId + "_date")
}
第二个是我必须一次加载 1 列和 1 行数据
for(row <- series) {
val insertStr = "%s, %g".
format(
row.asInstanceOf[Map[String, Date]]("date").asInstanceOf[String],
parseDoubleOrZero(row.asInstanceOf[Map[String, Double]]("value").asInstanceOf[String])
)
}
最佳答案
拥有一个包含 200.000 列的 DataFrame 并不是一个好主意。我建议的一件事是将问题分开一点,不要混合太多技术:
- 数据摄取:您的系列实际上有多大?尽可能避免连接(连接意味着洗牌,洗牌意味着网络,这将使一切变慢)。如果适合的话,我会使用 Scala 收集数据并将其保存在内存中,如果不适合,我仍然会在 Scala 中收集批量的系列,并将每个批处理转换为 Spark DataFrame。
- 数据帧创建:如果您设法将数据放入内存中,那么您可以尝试使用以下代码片段来创建数据帧:
case class Point(timestamp: Long, value: Long)
case class Series(id: String, points: List[Point])
val s1 = Series("s1", List(Point(1, 100), Point(2, 200), Point(3, 100)))
val s2 = Series("s2", List(Point(1, 1000), Point(3, 100)))
val seriesDF = sc.parallelize(Array(s1, s2)).toDF
seriesDF.show()
seriesDF.select($"id", explode($"points").as("point"))
.select($"id", $"point.timestamp", $"point.value")
.show()
输出:
+---+--------------------+
| id| points|
+---+--------------------+
| s1|[[1,100], [2,200]...|
| s2| [[1,1000], [3,100]]|
+---+--------------------+
+---+---------+-----+
| id|timestamp|value|
+---+---------+-----+
| s1| 1| 100|
| s1| 2| 200|
| s1| 3| 100|
| s2| 1| 1000|
| s2| 3| 100|
+---+---------+-----+
对于处理时间序列的更奇特的方式,我推荐以下项目:https://github.com/twosigma/flint
关于json - 在 Spark DataFrame 中布局 TimeSeries 数据的最佳方式 - Scala,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41819417/