在实时流数据上复制 pandas
滚动窗口功能的有效方法是什么?
假设我们想要维护最后n个观察值的总和,带有maxlen
参数的collections.deque
是可行的方法,但是什么如果我们需要最后 m 秒的值的总和而不是固定的 n ?
使用pandas.Series
效率很低,因为底层numpy
数组是不可变的,因此不适合处理实时数据流 - 每个追加都会复制整个数组。
像cachetools.TTLCache
这样的东西适合存储实时数据,但计算效率低下 - 获取总和需要每次迭代每个元素。
目前,我维护实时数据流总和的方法使用 collections.deque
、time-to-live 参数和 while
循环丢弃旧值:
import time
from collections import deque
class Accumulator:
def __init__(self, ttl):
self.ttl = ttl
self.sum = 0
self.q = deque()
def append(self, value):
self.q.append((time.time(), value))
self.sum += value
self.discard_old_values()
def discard_old_values(self):
cutoff_time = time.time() - self.ttl
try:
while self.q[0][0] < cutoff_time:
self.sum -= self.q.popleft()[1]
except IndexError:
pass
def get_sum(self):
self.discard_old_values()
return self.sum
所以问题是:
- 我们可以做得更好吗?
- 有没有办法摆脱这里丑陋的
while
循环? - 这种类型的“TTL 队列”数据结构的通用名称是什么?
- 是否有一个流行的 Python 库已经实现了它?
- 有没有办法在可变集合上利用
pandas
滚动窗口?
最佳答案
这是我对 discard_old_values
提出的改进:
def discard_old_values(self):
die = time.time() - self.ttl
while(len(self.q) > 0 and self.q[0][0] < die):
self.sum -= self.q.popleft()[1]
我决定不对每个可能被丢弃的元素调用 time.time()
,而是调用一次并进行算术运算以仅找到一次“死亡时间”。
这样就节省了抛出异常的成本,以及多次调用的时间成本。
关于python - Python 中高效的时间限制队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51485656/