我想在一个计算数据集上编写三个单独的输出,为此我必须缓存/保留我的第一个数据集,否则它将对第一个数据集计算三次,这会增加我的计算时间。
例如
FirstDataset // Get data from kafka;
SecondDataset = FirstDataSet.mapPartitions(Some Calculations);
ThirdDataset = SecondDataset.mapPartitions(Some Calculations);
现在我想过滤我的 ThirdDataset 并输出具有不同逻辑的三种不同条件的过滤后的数据集。
ThirdDataset.filter(**Condition1**).writeStream().foreach(**SOMECALCULATIONS1**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();
ThirdDataset.filter(**Condition2**).writeStream().foreach(**SOMECALCULATIONS2**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();
ThirdDataset.filter(**Condition3**).writeStream().foreach(**SOMECALCULATIONS3**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();
现在对于每个写入流 ThirdDataset 正在计算,如果我缓存 ThirdDataset 那么它不会计算三次。
但是当我执行 ThirdDataset.cache()
时,它会出现以下错误,
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
任何人都可以推荐我吗?
最佳答案
使用 foreachbatch 接收器并在数据帧/数据集上进行缓存!
关于apache-spark - 我们如何在 Spark 结构化流 2.4.4 中缓存/保留数据集,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59781907/