我正在使用pandas API在 Azure Databricks 上的 Spark 上,我有一个非常大的 pyspark.pandas.DataFrame
,调用 data
,包含可以识别的不同产品的销售信息(作为时间序列)通过sku_id
列可以识别销售信息,可以通过target
列来识别销售信息。我有一个函数 forecast
,它为每个 sku_id
实现预测。我以为会像data.groupby("sku_id").target.apply(forecast)一样简单,但是进入forecast函数的时间序列是分区的,即 forecast
没有看到每个 sku 的完整时间序列。我知道像 len 之类的一些函数可以批量操作,然后以简单的方式合并结果(
添加每个分区的结果),预测
则不然,它需要每个 sku 的整个时间序列。我检查了时间序列是否已分区,如果使用短序列评估预测,则会引发错误,如下所示
def forecast(y):
if len(y) <= 100:
raise NotImplementedError("Serie too short")
# Do the forecast...
我确信对于每个 sku_id
,观察值都大于 100(更多),但是 apply
失败并出现 NotImplementedError
,所以时间序列正在被分区
所以我想了解如何在 groupby 操作上应用像 forecast
这样的方法,而不需要对时间序列进行分区。我还没有找到有关它的文档,我想知道这是否可以完成或者这是否是一个好的做法。
或者方法可能是其他的:如何对我的数据框进行分区,以便每个系列的 sku_id
位于同一分区中?
编辑
它适用于
applyInPandas
用于 Spark 数据框- Spark Pandas API并在预测函数中指定返回类型,运行没有问题,不进入raise
def forecast(y) -> float:
if len(y) <= 100:
raise NotImplementedError("Serie too short")
# Do the forecast...
为什么会发生后者? documentation说
this API executes the function once to infer the type which is potentially expensive, for instance, when the dataset is created after aggregations or sorting. To avoid this, specify return type in func, for instance, as below
可能它会尝试推断返回类型以在第一个实例中使用子样本定义架构?
最佳答案
如果你看code itself它提供了一些提示。该代码最终会引导您到达 where it infers schema. 。对我来说,这显示了您的代码可能会失败的地方。这是因为那时它会引发您的错误。
first = rdd.first()
进一步深入,它还展示了如何关闭此行为并使用采样方法:spark.sql.pyspark.legacy.inferArrayTypeFromFirstElement.enabled
禁用此设置可以证明这确实是问题所在。如果您选择的样本大小至少大于所需的 len(100)
您还可以通过删除raise
来解决该问题。这导致了这个问题。您仍然可以使用 accumulator 跟踪 100 以下的任何集合。 。这些是为了跟踪这种类型的数据异常情况而设计的,如果您不想搞砸事情,但仍然跟踪奇怪情况的计数(例如少于 100 个时间序列事件),那么这可能是一个更好的选择。
关于python - 了解 pyspark 在 groupby 上的应用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72997320/