Python:如何将迭代器发送给两个不同的消费者而不将整个迭代器加载到内存中?

标签 python iterator coroutine

我有一个由两个函数(下面示例中的mean_summarizerstd_summarizer)使用的迭代器。我希望这两个函数都能处理迭代器,无需一次将整个迭代器加载到内存中。

下面是一个最小的示例 ( also in Colab ),它提供了正确的结果,除了它涉及将整个输入立即加载到内存中。无需理解 mean_summarizerstd_summarizerlast 内部的花哨代码 - 为了简洁起见,主要是这样。

问题是:在不更改函数签名(仅内部)的情况下重新实现 summarize_input_stream 的最简洁方法是什么,这样它的内存使用量就不会随着输入的长度而变化流?

我感觉涉及到协程,但我不知道如何使用它们。

import numpy as np
from typing import Iterable, Mapping, Callable, Any

def summarize_input_stream(  # Run the input stream through multiple summarizers and collect results
        input_stream: Iterable[float],
        summarizers: Mapping[str, Callable[[Iterable[float]], float]]
) -> Mapping[str, float]:
    inputs = list(input_stream)  # PROBLEM IS HERE <-- We load entire stream into memory at once
    return {name: summarizer(inputs) for name, summarizer in summarizers.items()}

def last(iterable: Iterable[Any]) -> Any:  # Just returns last element of iterable
  return max(enumerate(iterable))[1]

def mean_summarizer(stream: Iterable[float]) -> float:  # Just computes mean online and returns final value
  return last(avg for avg in [0] for i, x in enumerate(stream) for avg in [avg*i/(i+1) + x/(i+1)])

def std_summarizer(stream: Iterable[float]) -> float:   # Just computes standard deviation online and returns final value
  return last(cumsum_of_sq/(i+1) - (cumsum/(i+1))**2 for cumsum_of_sq, cumsum in [(0, 0)] for i, x in enumerate(stream) for cumsum_of_sq, cumsum in [(cumsum_of_sq+x**2, cumsum+x)])**.5

summary_stats = summarize_input_stream(
    input_stream=(np.random.randn()*2+3 for _ in range(1000)),
    summarizers={'mean': mean_summarizer, 'std': std_summarizer}
)
print(summary_stats)
# e.g. {'mean': 3.020903422847062, 'std': 1.943724669289156}

最佳答案

我找到了一个不涉及更改 summarize_input_stream 签名的解决方案。它为每个摘要器启动一个线程,并通过单独的阻塞队列 ( link to Colab ) 增量地提供每个线程。

import numpy as np
from typing import Iterable, Mapping, Callable, Any
from threading import Thread
from queue import Queue
from functools import partial


def summarize_input_stream(  # Run the input stream through multiple summarizers and collect results
        input_stream: Iterable[float],
        summarizers: Mapping[str, Callable[[Iterable[float]], float]]
) -> Mapping[str, float]:
    POISON_PILL = object()
    def run_summarizer(summarizer: Callable[[Iterable[float]], float], queue: Queue) -> float:
        result = summarizer(iter(queue.get, POISON_PILL)) # Waits until the food is ready to eat
        queue.put(result)  # Use the queue the other way around to return the result
    queues = [Queue(maxsize=1) for _ in summarizers]  # <-- Note We could can probably be more time-efficient if we increase maxsize, which should cause less thread switching at the cost of more memory usage
    threads = [Thread(target=partial(run_summarizer, summarizer, queue)) for summarizer, queue in zip(summarizers.values(), queues)]
    for t in threads:
        t.start()
    for inp in input_stream:
        for queue in queues:
            queue.put(inp)  # Waits until the summarizer is hungry to feed it
    for queue in queues:
        queue.put(POISON_PILL)  # Stop the iteration
    for t in threads:
        t.join()
    results = [queue.get() for queue in queues]
    return {name: result for name, result in zip(summarizers, results)}

def last(iterable: Iterable[Any]) -> Any:  # Just returns last element of iterable
    return max(enumerate(iterable))[1]

def mean_summarizer(stream: Iterable[float]) -> float:  # Just computes mean online and returns final value
    return last(avg for avg in [0] for i, x in enumerate(stream) for avg in [avg * i / (i + 1) + x / (i + 1)])

def std_summarizer(stream: Iterable[float]) -> float:  # Just computes standard deviation online and returns final value
    return last(cumsum_of_sq / (i + 1) - (cumsum / (i + 1)) ** 2 for cumsum_of_sq, cumsum in [(0, 0)] for i, x in enumerate(stream) for cumsum_of_sq, cumsum in
                [(cumsum_of_sq + x ** 2, cumsum + x)]) ** .5

summary_stats = summarize_input_stream(
    input_stream=(np.random.randn() * 2 + 3 for _ in range(1000)),
    summarizers={'mean': mean_summarizer, 'std': std_summarizer}
)
print(summary_stats)
# e.g. {'mean': 3.020903422847062, 'std': 1.943724669289156}

关于Python:如何将迭代器发送给两个不同的消费者而不将整个迭代器加载到内存中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73900617/

相关文章:

python - 如何将具有各种长度元组的 python 列表中的数据写入文件?

python - 从 bash 脚本重定向时,比较变量与字符串 python 不起作用

java - 如何使用itext pdf在页面pdf中插入值

c++ - 在 C++ 中, "const_iterator"与 "const iterator"相同吗?

templates - 尼姆 : How to wrap/derive an iterator from another iterator?

python - 在没有协同例程的情况下编写良好的 Scala(包括使用 Yield 的 Python 示例)

android - 如何取消withContext中运行的协程?

python - pytest 不允许具有非静态方法的类

c# - 随着时间的推移造成的伤害统一

python - 如何将 Python 中的双 UTF-8 解码器代码转换为 Lua