python - pandas、polars 或 torch 中函数的高效迭代和应用?偷懒可能吗?

标签 python python-3.x pandas pytorch python-polars

目标: 找到一种有效/最快的方法来按列迭代表并在每列上运行函数,在 python 中或使用 python 库。

背景: 我一直在探索提高函数速度的方法。这是因为我有两个模型/算法,我想运行一个小的,一个大的(使用 torch ),而大的很慢。我一直用小号进行测试。小模型是每列的季节分解。

设置:

Testing environment: ec2, t2 large. X86_64
Python version: 3.11.5 
Polars: 0.19.13 
pandas: 2.1.1 
numpy: 1.26.0

pandas/polls 中的演示数据:

rows = 11020
columns = 1578
data = np.random.rand(rows, columns)
df = pd.DataFrame(data)
# df_p = pl.from_pandas(df) # convert if needed.

Pandas

Pandas 和字典:

from statsmodels.tsa.seasonal import seasonal_decompose
import pandas as pd

class pdDictTrendExtractor:
        
    def __init__(self, period: int = 365) -> None:
        self._period = period
        self._model = 'Additive'
        
    def process_col(self, column_data: pd.Series = None) -> torch.Tensor:
        self.data = column_data 
        result = seasonal_decompose(self.data, model=self._model, period=self._period)
        trend = result.trend.fillna(0).values
        return trend

    @classmethod
    def process_df(cls,  dataframe: pd.DataFrame) -> pd.DataFrame:
        trend_data_dict = {}
        for column in dataframe.columns:
                trend_data_dict[column] = cls().process_col(dataframe[column])
        trend_dataframes = pd.DataFrame(trend_data_dict, index=dataframe.index)
        return trend_dataframes

import timeit
start = timeit.default_timer()
trend_tensor = pdDictTrendExtractor.process_df(df)
stop = timeit.default_timer()
execution_time = stop - start

print("Program Executed in "+str(execution_time))

程序在14.349091062998923执行

使用列表理解而不是 for 循环:

class pdDict2TrendExtractor:
        
    def __init__(self, period: int = 365) -> None:
        self._period = period
        self._model = 'Additive'
        
    def process_col(self, column_data: pd.Series = None) -> pd.Series:
        self.data = column_data 
        result = seasonal_decompose(self.data, model=self._model, period=self._period)
        trend = result.trend.fillna(0).values
        return trend

    @classmethod
    def process_df(cls,  dataframe: pd.DataFrame) -> pd.DataFrame:
        trend_data_dict = {column: cls().process_col(dataframe[column]) for column in dataframe.columns}
        trend_dataframes = pd.DataFrame(trend_data_dict, index=dataframe.index)
        return trend_dataframes

程序在14.343959668000025执行

使用 pandas 和 torch 的类:

from statsmodels.tsa.seasonal import seasonal_decompose
import torch
import pandas as pd

class pdTrendExtractor:
        
    def __init__(self, period: int = 365) -> None:
        self._period = period
        self._model = 'Additive'
         # Store data as an instance variable
        
    def process_col(self, column_data: pd.Series = None) -> torch.Tensor:
        self.data = column_data 
        result = seasonal_decompose(self.data, model=self._model, period=self._period)
        trend = result.trend.fillna(0).values
        return torch.tensor(trend, dtype=torch.float32).view(-1, 1)

    @classmethod
    def process_df(cls,  dataframe: pd.DataFrame) -> torch.Tensor:
        trend_dataframes = torch.Tensor()
        for column in dataframe.columns:
            trend_data = cls().process_col(dataframe[column])
            trend_dataframes = torch.cat((trend_dataframes, trend_data), dim=1)
        return trend_dataframes


start = timeit.default_timer()
trend_tensor = pdTrendExtractor.process_df(df_p)
stop = timeit.default_timer()
execution_time = stop - start

print("Program Executed in "+str(execution_time))

程序在23.14214362200073执行

使用字典、多重处理和列表理解: 正如下面@roganjosh 和@jquurious 所建议的。

from multiprocessing import Pool

class pdMTrendExtractor:
        
    def __init__(self, period: int = 365) -> None:
        self._period = period
        self._model = 'Additive'
        
    def process_col(self, column_data: pd.Series = None) -> pd.Series:
        result = seasonal_decompose(column_data, model=self._model, period=self._period)
        trend = result.trend.fillna(0).values
        return trend

    @classmethod
    def process_df(cls, dataframe: pd.DataFrame) -> pd.DataFrame:
        with Pool() as pool:
            trend_data_dict = dict(zip(dataframe.columns, pool.map(cls().process_col, [dataframe[column] for column in dataframe.columns])))

        return pd.DataFrame(trend_data_dict, index=dataframe.index)

程序在4.582350738997775中执行,又好又快。

极地

北极光和手电筒:

class plTorTrendExtractor:
        
    def __init__(self, period: int = 365) -> None:
        self._period = period
        self._model = 'Additive'
         # Store data as an instance variable
        
    def process_col(self, column_data: pl.Series = None) -> torch.Tensor:
        self.data = column_data 
        result = seasonal_decompose(self.data, model=self._model, period=self._period)
        trend = result.trend[np.isnan(result.trend)] = 0
        return torch.tensor(trend, dtype=torch.float32).view(-1, 1)

    @classmethod
    def process_df(cls,  dataframe: pl.DataFrame) -> torch.Tensor:
        trend_dataframes = torch.Tensor()
        for column in dataframe.columns:
            trend_data = cls().process_col(dataframe[column])
            trend_dataframes = torch.cat((trend_dataframes, trend_data), dim=1)
        return trend_dataframes

程序在13.813817326999924执行

极坐标和 lamdba:

start = timeit.default_timer()

df_p = df_p.select([
    pl.all().map_batches(lambda x: pl.Series(seasonal_decompose(x, model="Additive", period=365).trend)).fill_nan(0)
]
)
stop = timeit.default_timer()
execution_time = stop - start

print("Program Executed in "+str(execution_time))

程序在82.5596211330012执行

我怀疑这写得不好,也是它这么慢的原因。我还没有找到更好的方法。

到目前为止,我已经尝试过 apply_many、apply、map、map_batches 或 map_elements..with_columns 与 select 以及其他一些组合。

仅限极坐标,for 循环:

class plTrendExtractor:
        
    def __init__(self, period: int = 365) -> None:
        self._period = period
        self._model = 'Additive'
         # Store data as an instance variable
        
    def process_col(self, column_data: pl.Series = None) -> pl.DataFrame:
        self.data = column_data 
        result = seasonal_decompose(self.data, model=self._model, period=self._period)
        
        # Handle missing values by replacing NaN with 0
        result.trend[np.isnan(result.trend)] = 0
        return pl.DataFrame({column_data.name: result.trend})

    @classmethod
    def process_df(cls,  dataframe: pl.DataFrame) -> pl.DataFrame:
        trend_dataframes = pl.DataFrame()
        for column in dataframe.columns:
            trend_data = cls().process_col(dataframe[column])
            trend_dataframes = trend_dataframes.hstack(trend_data)
        return trend_dataframes

程序在13.34212675299932执行

使用列表理解:

我尝试使用极坐标和列表理解。但在使用 Polars 语法时遇到困难。

使用字典和 for 循环:

程序在13.743039597999996执行

带有字典和列表理解:

class plDict2TrendExtractor:
        
    def __init__(self, period: int = 365) -> None:
        self._period = period
        self._model = 'Additive'
        
    def process_col(self, column_data: pl.Series = None) ->  pl.Series:
        self.data = column_data 
        result = seasonal_decompose(self.data, model=self._model, period=self._period)
        result.trend[np.isnan(result.trend)] = 0
        return pl.Series(result.trend)

    @classmethod
    def process_df(cls,  dataframe: pl.DataFrame) -> pl.DataFrame:
        trend_data_dict = {column: cls().process_col(dataframe[column]) for column in dataframe.columns}
        trend_dataframes = pl.DataFrame(trend_data_dict)
        return trend_dataframes

程序在13.008102383002552执行

使用字典、多重处理和列表理解: 正如下面@roganjosh 和@jquurious 所建议的。

from multiprocessing import Pool

class plMTrendExtractor:
        
    def __init__(self, period: int = 365) -> None:
        self._period = period
        self._model = 'Additive'
        
    def process_col(self, column_data: pl.Series = None) -> pl.Series:
        result = seasonal_decompose(column_data, model=self._model, period=self._period)
        result.trend[np.isnan(result.trend)] = 0
        return pl.Series(result.trend)

    @classmethod
    def process_df(cls, dataframe: pl.DataFrame) -> pl.DataFrame:
        with Pool() as pool:
            trend_data_dict = dict(zip(dataframe.columns, pool.map(cls().process_col, [dataframe[column] for column in dataframe.columns])))

        return pl.DataFrame(trend_data_dict)

程序在4.997288776001369执行,很好!

使用lazyFrame?

我可以将惰性和收集添加到上面的df_p.select()方法中,但这样做并不能缩短时间。关键问题之一似乎是传递给惰性操作的函数也需要惰性。我希望它可以并行运行每一列。

当前结论和注释

  • 在某些运行中,我得到了一秒到半秒的变化。
  • Pandas 和 dict,似乎是合理的。如果您关心索引,那么这可能是一个不错的选择。
  • 具有字典和列表理解的 Polar 是“最快的”。但幅度不大。考虑到差异甚至更小。
  • 这两个选项还具有不需要额外软件包的优点。
  • Polars 似乎还有改进的空间。就更好的代码而言,但不确定这是否会大大缩短时间。主要的计算时间是seasonal_decompose。如果单独运行,每列大约需要 0.012 秒。
  • 欢迎任何有关改进的反馈
  • 警告:我还没有对上述函数进行完整的输出验证。
  • 如何从 process_col 返回变量对速度影响很小。正如预期的那样,也是我在这里调整的一部分。例如,对于极坐标,如果我返回 numpy 数组,我的时间就会变慢。如果我返回一个 numpy 数组,但声明 -> pl.series,这看起来速度大致相同,其中一两次试验更快(然后上面)。

反馈/添加多重处理后

  • 惊喜,多重处理才能取胜。这似乎与 Pandas 或极地无关。

最佳答案

对于 Polars,在这种情况下使用 .select().map_batches() 是一种“反模式”.

您将所有数据通过 Polars 表达式引擎,将其传回 Python 以运行外部函数,然后再次将其传回 Polars。

您可以绕过它,只需将每个 Series 直接传递给 seasonal_decompose() (类似于 Pandas 方法中循环遍历每一列的方式):

pl.DataFrame({
    col.name: seasonal_decompose(col, model="Additive", period=365).trend
    for col in df_p
})

我确实注意到的一件事是,如果您从每列创建一个 LazyFrame 并使用 pl.collect_all()它将 .map_batches() 方法的速度提高了约 50%。 (也许可以对此进行调查。)

(虽然还是比理解慢一点。)

lf = df_p.lazy()

lazy_columns = [ 
    lf.select(pl.col(col).map_batches(
        lambda x: pl.Series(seasonal_decompose(x, model="Additive", period=365).trend))
    ) 
    for col in lf.columns 
]

out = pl.concat(pl.collect_all(lazy_columns), how="horizontal")

本质上,问题变成了“如何并行化 Python for 循环?”

正如 @roganjosh 指出的那样,这是通过 multiprocessing. 完成的

from multiprocessing import get_context

...

if __name__ == "__main__":

    df_p = ...

    with get_context("spawn").Pool() as pool:
        columns = pool.map(process_column, (col for col in df_p))

出于兴趣,该示例在多处理方面比常规理解的运行速度快了约 50%。

但它是特定于任务/数据/平台的,因此您可以在本地对其进行基准测试。

关于python - pandas、polars 或 torch 中函数的高效迭代和应用?偷懒可能吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/77484060/

相关文章:

python - 如何用按顺序开始的自然数填充 nan 列?

python - Google Drive API 删除 Python

pandas - 根据第一行的条件分组并应用 lambda - Pandas

python - Groupby 保持组间顺序?以何种方式?

python - 如何知道何时使用 numpy.linalg 而不是 scipy.linalg?

python - 使用 float 将正态分布拟合到加权数据

python - 为什么 "else"子句的目的是在 "for"或 "while"循环之后?

python - 初学者 - GUI 切换按钮

python - 改变 turtle 图形的操作

python - 如何在fstrings中使用.loc?