我有一个由两个函数(下面示例中的mean_summarizer
和std_summarizer
)使用的迭代器。我希望这两个函数都能处理迭代器,无需一次将整个迭代器加载到内存中。
下面是一个最小的示例 ( also in Colab ),它提供了正确的结果,除了它涉及将整个输入立即加载到内存中。无需理解 mean_summarizer
、std_summarizer
和 last
内部的花哨代码 - 为了简洁起见,主要是这样。
问题是:在不更改函数签名(仅内部)的情况下重新实现 summarize_input_stream
的最简洁方法是什么,这样它的内存使用量就不会随着输入的长度而变化流?
我感觉涉及到协程,但我不知道如何使用它们。
import numpy as np
from typing import Iterable, Mapping, Callable, Any
def summarize_input_stream( # Run the input stream through multiple summarizers and collect results
input_stream: Iterable[float],
summarizers: Mapping[str, Callable[[Iterable[float]], float]]
) -> Mapping[str, float]:
inputs = list(input_stream) # PROBLEM IS HERE <-- We load entire stream into memory at once
return {name: summarizer(inputs) for name, summarizer in summarizers.items()}
def last(iterable: Iterable[Any]) -> Any: # Just returns last element of iterable
return max(enumerate(iterable))[1]
def mean_summarizer(stream: Iterable[float]) -> float: # Just computes mean online and returns final value
return last(avg for avg in [0] for i, x in enumerate(stream) for avg in [avg*i/(i+1) + x/(i+1)])
def std_summarizer(stream: Iterable[float]) -> float: # Just computes standard deviation online and returns final value
return last(cumsum_of_sq/(i+1) - (cumsum/(i+1))**2 for cumsum_of_sq, cumsum in [(0, 0)] for i, x in enumerate(stream) for cumsum_of_sq, cumsum in [(cumsum_of_sq+x**2, cumsum+x)])**.5
summary_stats = summarize_input_stream(
input_stream=(np.random.randn()*2+3 for _ in range(1000)),
summarizers={'mean': mean_summarizer, 'std': std_summarizer}
)
print(summary_stats)
# e.g. {'mean': 3.020903422847062, 'std': 1.943724669289156}
最佳答案
我找到了一个不涉及更改 summarize_input_stream
签名的解决方案。它为每个摘要器启动一个线程,并通过单独的阻塞队列 ( link to Colab ) 增量地提供每个线程。
import numpy as np
from typing import Iterable, Mapping, Callable, Any
from threading import Thread
from queue import Queue
from functools import partial
def summarize_input_stream( # Run the input stream through multiple summarizers and collect results
input_stream: Iterable[float],
summarizers: Mapping[str, Callable[[Iterable[float]], float]]
) -> Mapping[str, float]:
POISON_PILL = object()
def run_summarizer(summarizer: Callable[[Iterable[float]], float], queue: Queue) -> float:
result = summarizer(iter(queue.get, POISON_PILL)) # Waits until the food is ready to eat
queue.put(result) # Use the queue the other way around to return the result
queues = [Queue(maxsize=1) for _ in summarizers] # <-- Note We could can probably be more time-efficient if we increase maxsize, which should cause less thread switching at the cost of more memory usage
threads = [Thread(target=partial(run_summarizer, summarizer, queue)) for summarizer, queue in zip(summarizers.values(), queues)]
for t in threads:
t.start()
for inp in input_stream:
for queue in queues:
queue.put(inp) # Waits until the summarizer is hungry to feed it
for queue in queues:
queue.put(POISON_PILL) # Stop the iteration
for t in threads:
t.join()
results = [queue.get() for queue in queues]
return {name: result for name, result in zip(summarizers, results)}
def last(iterable: Iterable[Any]) -> Any: # Just returns last element of iterable
return max(enumerate(iterable))[1]
def mean_summarizer(stream: Iterable[float]) -> float: # Just computes mean online and returns final value
return last(avg for avg in [0] for i, x in enumerate(stream) for avg in [avg * i / (i + 1) + x / (i + 1)])
def std_summarizer(stream: Iterable[float]) -> float: # Just computes standard deviation online and returns final value
return last(cumsum_of_sq / (i + 1) - (cumsum / (i + 1)) ** 2 for cumsum_of_sq, cumsum in [(0, 0)] for i, x in enumerate(stream) for cumsum_of_sq, cumsum in
[(cumsum_of_sq + x ** 2, cumsum + x)]) ** .5
summary_stats = summarize_input_stream(
input_stream=(np.random.randn() * 2 + 3 for _ in range(1000)),
summarizers={'mean': mean_summarizer, 'std': std_summarizer}
)
print(summary_stats)
# e.g. {'mean': 3.020903422847062, 'std': 1.943724669289156}
关于Python:如何将迭代器发送给两个不同的消费者而不将整个迭代器加载到内存中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73900617/