apache-spark - 我们如何在 Spark 结构化流 2.4.4 中缓存/保留数据集

标签 apache-spark spark-structured-streaming apache-spark-dataset

我想在一个计算数据集上编写三个单独的输出,为此我必须缓存/保留我的第一个数据集,否则它将对第一个数据集计算三次,这会增加我的计算时间。

例如

FirstDataset // Get data from kafka;

SecondDataset = FirstDataSet.mapPartitions(Some Calculations);

ThirdDataset = SecondDataset.mapPartitions(Some Calculations);

现在我想过滤我的 ThirdDataset 并输出具有不同逻辑的三种不同条件的过滤后的数据集。

ThirdDataset.filter(**Condition1**).writeStream().foreach(**SOMECALCULATIONS1**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();

ThirdDataset.filter(**Condition2**).writeStream().foreach(**SOMECALCULATIONS2**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();

ThirdDataset.filter(**Condition3**).writeStream().foreach(**SOMECALCULATIONS3**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();

现在对于每个写入流 ThirdDataset 正在计算,如果我缓存 ThirdDataset 那么它不会计算三次。

但是当我执行 ThirdDataset.cache() 时,它会出现以下错误,

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

任何人都可以推荐我吗?

最佳答案

使用 foreachbatch 接收器并在数据帧/数据集上进行缓存!

关于apache-spark - 我们如何在 Spark 结构化流 2.4.4 中缓存/保留数据集,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59781907/

相关文章:

apache-spark - 如何计算训练模型的对数损失?

apache-spark - 在spark中保存固定大小的parquet输出文件

json - 转换为 Spark 所需的 JSON 格式,以便在 Java 中为数据帧创建架构

apache-spark - Spark 结构化流应用程序中的死亡执行者

java - 根据java中的时间戳按月对spark数据集进行分组

scala - Spark 错误 : Unable to find encoder for type stored in a Dataset

apache-spark - 如何使用pyspark对数据框中的两列进行数学运算

scala - 如何使用 Spark Structured Streaming 将数据从 Kafka 主题流式传输到 Delta 表

scala - 在我停止作业之前,Spark Structured Streaming writestream 不会写入文件

scala - 为什么使用案例类对 JSON 进行编码时会出现错误 "Unable to find encoder for type stored in a Dataset"?