我有一个大型数据源,我可以从中逐 block 提取数据。
我不想返回一大块 chunksize
项,而是通过生成器生成单个项,一旦我从一个 block 中生成了所有数据,我就想加载下一个 block 。
目标是一次返回一项,无需加载完整数据源,也无需从数据源中一项一项地提取项。
这是一些伪代码:
def get_data_chunk(datasource, chunksize=10):
# grab chunksize elements of datasource
return data_chunk # a list of dict, usually
def generator(datasource):
data_chunk = get_data_chunk(datasource)
for item in data_chunk:
yield item
# if no more item in data_chunk, reload from get_data_chunk
# until datasource does not return anything
我尝试使用一个队列,一旦空了就重新填充,但没有成功。
最佳答案
您有两个选择:
使用
while True:
循环并在下一个 block 为空时退出:def generator(datasource): while True: data_chunk = get_data_chunk(datasource) if not data_chunk: return for item in data_chunk: yield item
使用
iter()
function 的两个参数形式在for
循环中:def generator(datasource): for data_chunk in iter(lambda: get_data_chunk(datasource), None): for item in data_chunk: yield item
或者,使用
itertools.chain.from_iterable()
:from itertools import chain def generator(datasource): chunk_iter = iter(lambda: get_data_chunk(datasource), None) yield from chain.from_iterable(chunk_iter)
后者要求您知道“最终值”是什么样的。在上面,我假设最终值为 None
,但如果它是空列表,则需要将 None
替换为 []
.
演示:
>>> from itertools import chain, islice
>>> from random import randrange
>>> demosource = (randrange(11, 81) for _ in range(17))
>>> def get_data_chunk(datasource, chunksize=10):
... return list(islice(datasource, chunksize))
...
>>> def generator(datasource):
... chunk_iter = iter(lambda: get_data_chunk(datasource), []) # last chuck is an empty list
... yield from chain.from_iterable(chunk_iter)
...
>>> list(generator(demosource))
[38, 47, 74, 13, 23, 24, 47, 61, 30, 38, 70, 41, 43, 47, 37, 34, 67]
关于python - 如何从多个项目的缓冲区中生成单个项目并定期重新填充缓冲区?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55080684/