java - 如何使用RDD持久化和缓存?

标签 java apache-spark spark-streaming

请告诉我如何使用 RDD 方法 Persist() 和 Cache(),这似乎是我通常用 java 编写的常规程序,比如 sparkStreaming,它是 DAG 的持续执行,其中每次的值RDD 将得到更新,因此 perist/cache 也将被一次又一次地调用,并导致覆盖该 RDD。

但正如下面的文档所示,这些方法似乎仅对交互式 shell 有用,或者与仅将所需的 RDD 存储在任何引用变量。

Spark Doc Link

scala> linesWithSpark.cache()
res7: spark.RDD[String] = spark.FilteredRDD@17e51082

scala> linesWithSpark.count()
res8: Long = 19

scala> linesWithSpark.count()
res9: Long = 19

对比

在连续的 spark-streaming 作业中,我认为这是非常相同的,不会一次又一次地评估。

JavaRDD sortedRDD =baseRDD.filter(f(x));

sortedRDD.count();
sortedRDD.saveAsNewHadoopAPIFile();
// Or Anything we want !   

如果您能帮助解决这个疑问,我将不胜感激。

最佳答案

Spark 中最重要的功能之一是跨操作在内存中持久化(或缓存)数据集。当你持久化一个 RDD 时,每个节点存储它在内存中计算的任何分区,并在对该数据集(或从它派生的数据集)的其他操作中重用它们。这允许 future 的行动更快(通常超过 10 倍)。缓存是迭代算法和快速交互使用的关键工具。 您可以使用 persist() 或 cache() 方法将 RDD 标记为持久化。第一次在 Action 中计算时,它将保存在节点的内存中。 Spark 的缓存是容错的——如果 RDD 的任何分区丢失,它将使用最初创建它的转换自动重新计算。 cache() 方法是使用默认存储级别的简写,即 StorageLevel.MEMORY_ONLY(将反序列化的对象存储在内存中)。

val linesWithSpark  = sc.textFile("/home/kishore/test.txt")
linesWithSpark.cache()
linesWithSpark.count()

它什么都不做。 RDD.cache 也是一个惰性操作。该文件仍未读取。但是现在 RDD 说“读取这个文件然后缓存内容”。如果您随后第一次运行 linesWithSpark.count,文件将被加载、缓存和计数。如果您第二次调用 linesWithSpark.count,该操作将使用缓存。它只会从缓存中获取数据并计算行数。

关于java - 如何使用RDD持久化和缓存?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31002975/

相关文章:

apache-spark - 将 Spark 应用程序与 Web 服务器连接

pyspark - 如何使用具有 Spark 数据流结构的非基于时间的窗口?

scala - 使用 Spark StreamingContext 从 Kafka 主题消费

hadoop - yarn : yarn-site. xml 更改未生效

java - 尝试使用 Thymeleaf 处理模板时出错

c# - 单个应用程序中的 SOAP 服务和 JSF/Icefaces

java - AddFolderListener 在某些设备中有效,但在其他设备中无效

java - 如何在javafx中右对齐换行文本

python - 连接操作后如何避免重复列?

python - Pyspark:根据条件和不同的值添加新列