python - 了解 pyspark 在 groupby 上的应用

标签 python pyspark

我正在使用pandas API在 Azure Databricks 上的 Spark 上,我有一个非常大的 pyspark.pandas.DataFrame,调用 data,包含可以识别的不同产品的销售信息(作为时间序列)通过sku_id列可以识别销售信息,可以通过target列来识别销售信息。我有一个函数 forecast,它为每个 sku_id 实现预测。我以为会像data.groupby("sku_id").target.apply(forecast)一样简单,但是进入forecast函数的时间序列是分区的,即 forecast 没有看到每个 sku 的完整时间序列。我知道像 len 之类的一些函数可以批量操作,然后以简单的方式合并结果( 添加每个分区的结果),预测则不然,它需要每个 sku 的整个时间序列。我检查了时间序列是否已分区,如果使用短序列评估预测,则会引发错误,如下所示

def forecast(y):
    if len(y) <= 100:
        raise NotImplementedError("Serie too short")
    # Do the forecast...

我确信对于每个 sku_id ,观察值都大于 100(更多),但是 apply 失败并出现 NotImplementedError ,所以时间序列正在被分区

所以我想了解如何在 groupby 操作上应用像 forecast 这样的方法,而不需要对时间序列进行分区。我还没有找到有关它的文档,我想知道这是否可以完成或者这是否是一个好的做法。

或者方法可能是其他的:如何对我的数据框进行分区,以便每个系列的 sku_id 位于同一分区中?

编辑

它适用于

  • applyInPandas 用于 Spark 数据框
  • Spark Pandas API并在预测函数中指定返回类型,运行没有问题,不进入raise
def forecast(y) -> float:
    if len(y) <= 100:
        raise NotImplementedError("Serie too short")
    # Do the forecast...

为什么会发生后者? documentation

this API executes the function once to infer the type which is potentially expensive, for instance, when the dataset is created after aggregations or sorting. To avoid this, specify return type in func, for instance, as below

可能它会尝试推断返回类型以在第一个实例中使用子样本定义架构?

最佳答案

如果你看code itself它提供了一些提示。该代码最终会引导您到达 where it infers schema. 。对我来说,这显示了您的代码可能会失败的地方。这是因为那时它会引发您的错误。

 first = rdd.first()

进一步深入,它还展示了如何关闭此行为并使用采样方法:spark.sql.pyspark.legacy.inferArrayTypeFromFirstElement.enabled

禁用此设置可以证明这确实是问题所在。如果您选择的样本大小至少大于所需的 len(100)

您还可以通过删除raise来解决该问题。这导致了这个问题。您仍然可以使用 accumulator 跟踪 100 以下的任何集合。 。这些是为了跟踪这种类型的数据异常情况而设计的,如果您不想搞砸事情,但仍然跟踪奇怪情况的计数(例如少于 100 个时间序列事件),那么这可能是一个更好的选择。

关于python - 了解 pyspark 在 groupby 上的应用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72997320/

相关文章:

python - Bootstrap 失败 : 5: Input/output error Error: Failure while executing

python - Pyspark dataframe 列包含字典数组,想要将字典中的每个键都变成一列

windows - Pyspark to_date() 函数在 Windows 和 WSL Ubuntu 上给出了不同的答案

scala - Spark 数据框将列值获取到字符串变量中

python - 对于一维 `X` , `X[n]` 和 `X[..., n]` 有什么区别?

python - 为 plone 4 实现一个简单的灵活性内容类型

python - 有没有办法从音频文件中删除/编辑名为 "tag"的元数据条目,而无需安装任何其他内容?

python - 对 Pandas 中的多列进行关联

apache-spark - 跨集群分布分区

apache-spark - 如何在 Pyspark 中对数据框进行排序