python - Python 中高效的时间限制队列?

标签 python caching data-structures stream queue

在实时流数据上复制 pandas 滚动窗口功能的有效方法是什么?

假设我们想要维护最后n个观察值的总和,带有maxlen参数的collections.deque是可行的方法,但是什么如果我们需要最后 m 秒的值的总和而不是固定的 n

使用pandas.Series效率很低,因为底层numpy数组是不可变的,因此不适合处理实时数据流 - 每个追加都会复制整个数组。

cachetools.TTLCache这样的东西适合存储实时数据,但计算效率低下 - 获取总和需要每次迭代每个元素。

目前,我维护实时数据流总和的方法使用 collections.dequetime-to-live 参数和 while 循环丢弃旧值:

import time
from collections import deque

class Accumulator:
    def __init__(self, ttl):
        self.ttl = ttl
        self.sum = 0
        self.q = deque()

    def append(self, value):
        self.q.append((time.time(), value))
        self.sum += value
        self.discard_old_values()

    def discard_old_values(self):
        cutoff_time = time.time() - self.ttl
        try:
            while self.q[0][0] < cutoff_time:
                self.sum -= self.q.popleft()[1]
        except IndexError:
            pass

    def get_sum(self):
        self.discard_old_values()
        return self.sum

所以问题是:

  1. 我们可以做得更好吗?
  2. 有没有办法摆脱这里丑陋的 while 循环?
  3. 这种类型的“TTL 队列”数据结构的通用名称是什么?
  4. 是否有一个流行的 Python 库已经实现了它?
  5. 有没有办法在可变集合上利用 pandas 滚动窗口?

最佳答案

这是我对 discard_old_values 提出的改进:

def discard_old_values(self):
    die = time.time() - self.ttl
    while(len(self.q) > 0 and self.q[0][0] < die):
        self.sum -= self.q.popleft()[1]

我决定不对每个可能被丢弃的元素调用 time.time(),而是调用一次并进行算术运算以仅找到一次“死亡时间”。

这样就节省了抛出异常的成本,以及多次调用的时间成本。

关于python - Python 中高效的时间限制队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51485656/

相关文章:

c# - 从 Xml 文件读取最有效和缓存的方式

javascript - 为什么 Redis (Nodejs) 在将新项目添加到数据库后会自动更新其缓存?

c# - 未知大小的数组,包含具有可变数量的值/属性的项目

iphone - 多结局故事的数据结构

python - 使pycaffe fatal error : 'Python.h' file not found

python - 拟合 svm 模型 errorValueError : could not convert string to float: '[array([0. 30067509、0.11679184、0.01250501 等

caching - 使用 memcached 和 rdbms(如 MySQL)时的缓存一致性

javascript - 优化: find from a flattened recursive structure by index

Python 多处理传递数据库游标对象

python - 如何在Python中分割一个巨大的文本数据集?