在我的构造函数中,我初始化了一个空字典,然后在 udf 中,我用来自批处理的新数据更新了它。
我的问题是,在每一个新的批处理中,字典又是空的。
如何绕过空步骤,以便新批处理可以访问我已经添加到字典中的所有先前值?
import CharacteristicVector
import update_charecteristic_vector
class SomeClass(object):
def __init__(self):
self.grid_list = {}
def run_stream(self):
def update_grid_list(grid):
if grid not in self.grid_list:
grid_list[grid] =
if grid not in self.grid_list:
self.grid_list[grid] = CharacteristicVector()
self.grid_list[grid] = update_charecteristic_vector(self.grid_list[grid])
return self.grid_list[grid].Density
.
.
.
udf_update_grid_list = udf(update_grid_list, StringType())
grids_dataframe = hashed.select(
hashed.grid.alias('grid'),
update_list(hashed.grid).alias('Density')
)
query = grids_dataframe.writeStream.format("console").start()
query.awaitTermination()
最佳答案
不幸的是,由于多种原因,此代码无法正常工作。即使在单个批处理或批处理应用程序中,它也只有在只有事件的 Python 工作进程时才能工作。此外,通常不可能拥有全局同步统计信息,同时支持读取和写入。
您应该能够使用 stateful transformations , 但目前,仅在 Java/Scala 中受支持,并且接口(interface)仍处于实验性/不断发展中。
根据您的要求,您可以尝试使用内存数据网格、键值存储或分布式缓存。
关于python - Spark Structured Streaming - 新批处理上的空字典,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48131718/