python - Spark Structured Streaming - 新批处理上的空字典

标签 python apache-spark dictionary pyspark spark-structured-streaming

在我的构造函数中,我初始化了一个空字典,然后在 udf 中,我用来自批处理的新数据更新了它。

我的问题是,在每一个新的批处理中,字典又是空的。

如何绕过空步骤,以便新批处理可以访问我已经添加到字典中的所有先前值?

import CharacteristicVector
import update_charecteristic_vector

class SomeClass(object):

    def __init__(self):
        self.grid_list = {}

    def run_stream(self):   

        def update_grid_list(grid):
            if grid not in self.grid_list:
                grid_list[grid] = 
            if grid not in self.grid_list:
                self.grid_list[grid] = CharacteristicVector()
            self.grid_list[grid] = update_charecteristic_vector(self.grid_list[grid])
            return self.grid_list[grid].Density
        .
        .
        .

        udf_update_grid_list = udf(update_grid_list, StringType())
        grids_dataframe = hashed.select(
            hashed.grid.alias('grid'),
            update_list(hashed.grid).alias('Density')
        )

        query = grids_dataframe.writeStream.format("console").start()
        query.awaitTermination()

最佳答案

不幸的是,由于多种原因,此代码无法正常工作。即使在单个批处理或批处理应用程序中,它也只有在只有事件的 Python 工作进程时才能工作。此外,通常不可能拥有全局同步统计信息,同时支持读取和写入。

您应该能够使用 stateful transformations , 但目前,仅在 Java/Scala 中受支持,并且接口(interface)仍处于实验性/不断发展中。

根据您的要求,您可以尝试使用内存数据网格、键值存储或分布式缓存。

关于python - Spark Structured Streaming - 新批处理上的空字典,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48131718/

相关文章:

python - 加载共享库 : libssl. so.0.9.8 时出现 django runserver 错误:无法打开共享对象文件:没有这样的文件或目录

python - 使用 Pandas 检查 csv 文件中的缺失值

json - 使用 Log4j 在日志中输出 Spark 应用程序 ID

c# - 使用字典中的键获取值

python - json 转储 TypeError : keys must be a string with dict

python - Matplotlib:用鼠标画出矩形形状的选区

python - 元组到日期时间

apache-spark - Apache Spark 警告 "Calling spill() on RowBasedKeyValueBatch"的含义

hadoop - spark如何写入HBASE

python - 提取每个键的值是字典的max min