目标: 找到一种有效/最快的方法来按列迭代表并在每列上运行函数,在 python 中或使用 python 库。
背景: 我一直在探索提高函数速度的方法。这是因为我有两个模型/算法,我想运行一个小的,一个大的(使用 torch ),而大的很慢。我一直用小号进行测试。小模型是每列的季节分解。
设置:
Testing environment: ec2, t2 large. X86_64
Python version: 3.11.5
Polars: 0.19.13
pandas: 2.1.1
numpy: 1.26.0
pandas/polls 中的演示数据:
rows = 11020
columns = 1578
data = np.random.rand(rows, columns)
df = pd.DataFrame(data)
# df_p = pl.from_pandas(df) # convert if needed.
Pandas
Pandas 和字典:
from statsmodels.tsa.seasonal import seasonal_decompose
import pandas as pd
class pdDictTrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
def process_col(self, column_data: pd.Series = None) -> torch.Tensor:
self.data = column_data
result = seasonal_decompose(self.data, model=self._model, period=self._period)
trend = result.trend.fillna(0).values
return trend
@classmethod
def process_df(cls, dataframe: pd.DataFrame) -> pd.DataFrame:
trend_data_dict = {}
for column in dataframe.columns:
trend_data_dict[column] = cls().process_col(dataframe[column])
trend_dataframes = pd.DataFrame(trend_data_dict, index=dataframe.index)
return trend_dataframes
import timeit
start = timeit.default_timer()
trend_tensor = pdDictTrendExtractor.process_df(df)
stop = timeit.default_timer()
execution_time = stop - start
print("Program Executed in "+str(execution_time))
程序在14.349091062998923执行
使用列表理解而不是 for 循环:
class pdDict2TrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
def process_col(self, column_data: pd.Series = None) -> pd.Series:
self.data = column_data
result = seasonal_decompose(self.data, model=self._model, period=self._period)
trend = result.trend.fillna(0).values
return trend
@classmethod
def process_df(cls, dataframe: pd.DataFrame) -> pd.DataFrame:
trend_data_dict = {column: cls().process_col(dataframe[column]) for column in dataframe.columns}
trend_dataframes = pd.DataFrame(trend_data_dict, index=dataframe.index)
return trend_dataframes
程序在14.343959668000025执行
使用 pandas 和 torch 的类:
from statsmodels.tsa.seasonal import seasonal_decompose
import torch
import pandas as pd
class pdTrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
# Store data as an instance variable
def process_col(self, column_data: pd.Series = None) -> torch.Tensor:
self.data = column_data
result = seasonal_decompose(self.data, model=self._model, period=self._period)
trend = result.trend.fillna(0).values
return torch.tensor(trend, dtype=torch.float32).view(-1, 1)
@classmethod
def process_df(cls, dataframe: pd.DataFrame) -> torch.Tensor:
trend_dataframes = torch.Tensor()
for column in dataframe.columns:
trend_data = cls().process_col(dataframe[column])
trend_dataframes = torch.cat((trend_dataframes, trend_data), dim=1)
return trend_dataframes
start = timeit.default_timer()
trend_tensor = pdTrendExtractor.process_df(df_p)
stop = timeit.default_timer()
execution_time = stop - start
print("Program Executed in "+str(execution_time))
程序在23.14214362200073执行
使用字典、多重处理和列表理解: 正如下面@roganjosh 和@jquurious 所建议的。
from multiprocessing import Pool
class pdMTrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
def process_col(self, column_data: pd.Series = None) -> pd.Series:
result = seasonal_decompose(column_data, model=self._model, period=self._period)
trend = result.trend.fillna(0).values
return trend
@classmethod
def process_df(cls, dataframe: pd.DataFrame) -> pd.DataFrame:
with Pool() as pool:
trend_data_dict = dict(zip(dataframe.columns, pool.map(cls().process_col, [dataframe[column] for column in dataframe.columns])))
return pd.DataFrame(trend_data_dict, index=dataframe.index)
程序在4.582350738997775中执行,又好又快。
极地
北极光和手电筒:
class plTorTrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
# Store data as an instance variable
def process_col(self, column_data: pl.Series = None) -> torch.Tensor:
self.data = column_data
result = seasonal_decompose(self.data, model=self._model, period=self._period)
trend = result.trend[np.isnan(result.trend)] = 0
return torch.tensor(trend, dtype=torch.float32).view(-1, 1)
@classmethod
def process_df(cls, dataframe: pl.DataFrame) -> torch.Tensor:
trend_dataframes = torch.Tensor()
for column in dataframe.columns:
trend_data = cls().process_col(dataframe[column])
trend_dataframes = torch.cat((trend_dataframes, trend_data), dim=1)
return trend_dataframes
程序在13.813817326999924执行
极坐标和 lamdba:
start = timeit.default_timer()
df_p = df_p.select([
pl.all().map_batches(lambda x: pl.Series(seasonal_decompose(x, model="Additive", period=365).trend)).fill_nan(0)
]
)
stop = timeit.default_timer()
execution_time = stop - start
print("Program Executed in "+str(execution_time))
程序在82.5596211330012执行
我怀疑这写得不好,也是它这么慢的原因。我还没有找到更好的方法。
到目前为止,我已经尝试过 apply_many、apply、map、map_batches 或 map_elements..with_columns 与 select 以及其他一些组合。
仅限极坐标,for 循环:
class plTrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
# Store data as an instance variable
def process_col(self, column_data: pl.Series = None) -> pl.DataFrame:
self.data = column_data
result = seasonal_decompose(self.data, model=self._model, period=self._period)
# Handle missing values by replacing NaN with 0
result.trend[np.isnan(result.trend)] = 0
return pl.DataFrame({column_data.name: result.trend})
@classmethod
def process_df(cls, dataframe: pl.DataFrame) -> pl.DataFrame:
trend_dataframes = pl.DataFrame()
for column in dataframe.columns:
trend_data = cls().process_col(dataframe[column])
trend_dataframes = trend_dataframes.hstack(trend_data)
return trend_dataframes
程序在13.34212675299932执行
使用列表理解:
我尝试使用极坐标和列表理解。但在使用 Polars 语法时遇到困难。
使用字典和 for 循环:
程序在13.743039597999996执行
带有字典和列表理解:
class plDict2TrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
def process_col(self, column_data: pl.Series = None) -> pl.Series:
self.data = column_data
result = seasonal_decompose(self.data, model=self._model, period=self._period)
result.trend[np.isnan(result.trend)] = 0
return pl.Series(result.trend)
@classmethod
def process_df(cls, dataframe: pl.DataFrame) -> pl.DataFrame:
trend_data_dict = {column: cls().process_col(dataframe[column]) for column in dataframe.columns}
trend_dataframes = pl.DataFrame(trend_data_dict)
return trend_dataframes
程序在13.008102383002552执行
使用字典、多重处理和列表理解: 正如下面@roganjosh 和@jquurious 所建议的。
from multiprocessing import Pool
class plMTrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
def process_col(self, column_data: pl.Series = None) -> pl.Series:
result = seasonal_decompose(column_data, model=self._model, period=self._period)
result.trend[np.isnan(result.trend)] = 0
return pl.Series(result.trend)
@classmethod
def process_df(cls, dataframe: pl.DataFrame) -> pl.DataFrame:
with Pool() as pool:
trend_data_dict = dict(zip(dataframe.columns, pool.map(cls().process_col, [dataframe[column] for column in dataframe.columns])))
return pl.DataFrame(trend_data_dict)
程序在4.997288776001369执行,很好!
使用lazyFrame?
我可以将惰性和收集添加到上面的df_p.select()
方法中,但这样做并不能缩短时间。关键问题之一似乎是传递给惰性操作的函数也需要惰性。我希望它可以并行运行每一列。
当前结论和注释
- 在某些运行中,我得到了一秒到半秒的变化。
- Pandas 和 dict,似乎是合理的。如果您关心索引,那么这可能是一个不错的选择。
- 具有字典和列表理解的 Polar 是“最快的”。但幅度不大。考虑到差异甚至更小。
- 这两个选项还具有不需要额外软件包的优点。
- Polars 似乎还有改进的空间。就更好的代码而言,但不确定这是否会大大缩短时间。主要的计算时间是seasonal_decompose。如果单独运行,每列大约需要 0.012 秒。
- 欢迎任何有关改进的反馈
- 警告:我还没有对上述函数进行完整的输出验证。
- 如何从 process_col 返回变量对速度影响很小。正如预期的那样,也是我在这里调整的一部分。例如,对于极坐标,如果我返回 numpy 数组,我的时间就会变慢。如果我返回一个 numpy 数组,但声明 -> pl.series,这看起来速度大致相同,其中一两次试验更快(然后上面)。
反馈/添加多重处理后
- 惊喜,多重处理才能取胜。这似乎与 Pandas 或极地无关。
最佳答案
对于 Polars,在这种情况下使用 .select()
和 .map_batches()
是一种“反模式”.
您将所有数据通过 Polars 表达式引擎,将其传回 Python 以运行外部函数,然后再次将其传回 Polars。
您可以绕过它,只需将每个 Series
直接传递给 seasonal_decompose()
(类似于 Pandas 方法中循环遍历每一列的方式):
pl.DataFrame({
col.name: seasonal_decompose(col, model="Additive", period=365).trend
for col in df_p
})
我确实注意到的一件事是,如果您从每列创建一个 LazyFrame 并使用 pl.collect_all()
它将 .map_batches()
方法的速度提高了约 50%。 (也许可以对此进行调查。)
(虽然还是比理解慢一点。)
lf = df_p.lazy()
lazy_columns = [
lf.select(pl.col(col).map_batches(
lambda x: pl.Series(seasonal_decompose(x, model="Additive", period=365).trend))
)
for col in lf.columns
]
out = pl.concat(pl.collect_all(lazy_columns), how="horizontal")
本质上,问题变成了“如何并行化 Python for 循环?”
正如 @roganjosh 指出的那样,这是通过 multiprocessing. 完成的
from multiprocessing import get_context
...
if __name__ == "__main__":
df_p = ...
with get_context("spawn").Pool() as pool:
columns = pool.map(process_column, (col for col in df_p))
出于兴趣,该示例在多处理方面比常规理解的运行速度快了约 50%。
但它是特定于任务/数据/平台的,因此您可以在本地对其进行基准测试。
关于python - pandas、polars 或 torch 中函数的高效迭代和应用?偷懒可能吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/77484060/