python - Python 中的统计累加器

标签 python oop statistics accumulator

统计累加器允许执行增量计算。例如,为了计算任意时间给定的数字流的算术平均值,可以创建一个对象来跟踪给定项目的当前数量 n 及其总和 sum。当请求均值时,该对象仅返回 sum/n

像这样的累加器允许您在某种意义上进行增量计算,即当给定一个新数字时,您不需要重新计算整个总和和计数。

可以为其他统计信息编写类似的累加器(对于 C++ 实现,请参见 boost library)。

您将如何在 Python 中实现累加器? The code I came up with是:

class Accumulator(object):
    """
    Used to accumulate the arithmetic mean of a stream of
    numbers. This implementation does not allow to remove items
    already accumulated, but it could easily be modified to do
    so. also, other statistics could be accumulated.
    """
    def __init__(self):
     # upon initialization, the numnber of items currently
     # accumulated (_n) and the total sum of the items acumulated
     # (_sum) are set to zero because nothing has been accumulated
     # yet.
     self._n = 0
     self._sum = 0.0

    def add(self, item):
     # the 'add' is used to add an item to this accumulator
     try:
        # try to convert the item to a float. If you are
        # successful, add the float to the current sum and
        # increase the number of accumulated items
        self._sum += float(item)
        self._n += 1
     except ValueError:
        # if you fail to convert the item to a float, simply
        # ignore the exception (pass on it and do nothing)
        pass

    @property
    def mean(self):
     # the property 'mean' returns the current mean accumulated in
     # the object
     if self._n > 0:
        # if you have more than zero items accumulated, then return
        # their artithmetic average
        return self._sum / self._n
     else:
        # if you have no items accumulated, return None (you could
        # also raise an exception)
        return None

# using the object:

# Create an instance of the object "Accumulator"
my_accumulator = Accumulator()
print my_accumulator.mean
# prints None because there are no items accumulated

# add one (a number)
my_accumulator.add(1)
print my_accumulator.mean
# prints 1.0

# add two (a string - it will be converted to a float)
my_accumulator.add('2')
print my_accumulator.mean
# prints 1.5

# add a 'NA' (will be ignored because it cannot be converted to float)
my_accumulator.add('NA')
print my_accumulator.mean
# prints 1.5 (notice that it ignored the 'NA')

有趣的设计问题出现了:

  1. 如何制作蓄能器 线程安全?
  2. 如何安全移除 项目?
  3. 如何以某种方式进行架构 允许其他统计数据 轻松插入(统计工厂)

最佳答案

对于通用的、线程安全的高级函数,您可以将类似以下内容与 Queue.Queue 类和其他一些位结合使用:

from Queue import Empty

def Accumulator(f, q, storage):
    """Yields successive values of `f` over the accumulation of `q`.

    `f` should take a single iterable as its parameter.

    `q` is a Queue.Queue or derivative.

    `storage` is a persistent sequence that provides an `append` method.
    `collections.deque` may be particularly useful, but a `list` is quite acceptable.

    >>> from Queue import Queue
    >>> from collections import deque
    >>> from threading import Thread
    >>> def mean(it):
    ...     vals = tuple(it)
    ...     return sum(it) / len(it)
    >>> value_queue = Queue()
    >>> LastThreeAverage = Accumulator(mean, value_queue, deque((), 3))
    >>> def add_to_queue(it, queue):
    ...     for value in it:
    ...         value_queue.put(value)
    >>> putting_thread = Thread(target=add_to_queue,
    ...                         args=(range(0, 12, 2), value_queue))
    >>> putting_thread.start()
    >>> list(LastThreeAverage)
    [0, 1, 2, 4, 6, 8]
    """
    try:
        while True:
            storage.append(q.get(timeout=0.1))
            q.task_done()
            yield f(storage)
    except Empty:
        pass

这个生成器函数通过将它委托(delegate)给其他实体来逃避它声称的大部分责任:

  • 它依赖于 Queue.Queue 以线程安全的方式提供其源元素
  • 一个collections.deque对象可以作为storage参数的值传入;除其他外,这提供了一种仅使用最后一个 n(在本例中为 3)个值
  • 的便捷方式
  • 函数本身(在本例中为 mean)作为参数传递。在某些情况下,这会导致代码效率不高,但很容易应用于各种情况。

请注意,如果您的生产者线程每个值花费的时间超过 0.1 秒,则累加器可能会超时。这很容易通过传递更长的超时或通过完全删除超时参数来补救。在后一种情况下,该函数将无限期地阻塞在队列的末尾;这种用法在子线程(通常是 daemon 线程)中使用时更有意义。当然,您也可以将传递给 q.get 的参数参数化为 Accumulator 的第四个参数。

如果你想从生产者线程(这里是 putting_thread)传递队列的末端,即没有更多的值要来,你可以传递并检查一个哨兵值或使用一些其他方法。 this thread 中有更多信息;我选择编写一个名为 CloseableQueue 的 Queue.Queue 子类提供了一个 close 方法。

还有许多其他方法可以自定义此类函数的行为,例如通过限制队列大小;这只是一个用法示例。

编辑

如上所述,由于需要重新计算,这会降低一些效率,而且我认为,这并不能真正回答您的问题。

生成器函数也可以通过其 send 方法接受值。所以你可以写一个均值生成器函数,比如

def meangen():
    """Yields the accumulated mean of sent values.

    >>> g = meangen()
    >>> g.send(None) # Initialize the generator
    >>> g.send(4)
    4.0
    >>> g.send(10)
    7.0
    >>> g.send(-2)
    4.0
    """
    sum = yield(None)
    count = 1
    while True:
        sum += yield(sum / float(count))
        count += 1

这里的 yield 表达式将值——send 的参数——带入函数,同时将计算出的值作为 send 的返回值传递出去。

您可以将调用该函数返回的生成器传递给更可优化的累加器生成器函数,如下所示:

def EfficientAccumulator(g, q):
    """Similar to Accumulator but sends values to a generator `g`.

    >>> from Queue import Queue
    >>> from threading import Thread
    >>> value_queue = Queue()
    >>> g = meangen()
    >>> g.send(None)
    >>> mean_accumulator = EfficientAccumulator(g, value_queue)
    >>> def add_to_queue(it, queue):
    ...     for value in it:
    ...         value_queue.put(value)
    >>> putting_thread = Thread(target=add_to_queue,
    ...                         args=(range(0, 12, 2), value_queue))
    >>> putting_thread.start()
    >>> list(mean_accumulator)
    [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]
    """
    try:
        while True:
            yield(g.send(q.get(timeout=0.1)))
            q.task_done()
    except Empty:
        pass

关于python - Python 中的统计累加器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/3774315/

相关文章:

machine-learning - 有偏差的初始数据集主动学习

python - 使用 Tkinter 将用户输入导出到 .csv 文件

java - 使用反射在方法中检索类

r - == 与公式的奇怪行为

r - 变量与 R 独立性的卡方检验

java - 将用户数据保存在文本文件或序列化对象中

python - session 未提交 - checkout 12b - Flask Web 开发

python - Django 数据库路由

python - 如何检查变量是否包含不允许输入的字符?

C#抽象类有另一个抽象类对象