java - Spark RDD- map 与 mapPartitions

标签 java scala apache-spark garbage-collection

我通读了 map 和 mapPartitions 之间的理论差异,并且很清楚何时在各种情况下使用它们。

但我下面描述的问题更多是基于 GC Activity 和内存 (RAM)。请阅读下面的问题:-

=> 我写了一个映射函数来将 Row 转换为 String。因此,RDD[org.apache.spark.sql.Row] 的输入将映射到 RDD[String]。但是使用这种方法,将为 RDD 的每一行创建映射对象。因此,创建如此大量的对象可能会增加 GC Activity 。

=> 为了解决上面的问题,我想到了使用 mapPartitions。因此,对象的数量等于分区的数量。 mapPartitions 将 Iterator 作为输入并接受返回和 java.lang.Iterable。但大多数 Iterable,如 Array、List 等都在内存中。那么,如果我有大量数据,那么以这种方式创建 Iterable 会导致内存不足吗?或者是否有任何其他集合(java 或 scala)应该在这里使用(在内存开始填满时溢出到磁盘)?或者我们应该只在 RDD 完全在内存中时才使用 mapPartitions 吗?

提前致谢。任何帮助将不胜感激。

最佳答案

如果您考虑 JavaRDD.mapPartitions,它需要 FlatMapFunction(或类似 DoubleFlatMapFunction 的变体),预计会返回 Iterator 不是可迭代。如果底层集合是惰性的,那么您无需担心。

RDD.mapPartitions 接受从 IteratorIterator 的函数。

我一般来说,如果您使用引用数据,您可以将 mapPartitions 替换为 map 并使用静态成员来存储数据。这将具有相同的占用空间并且更易于编写。

关于java - Spark RDD- map 与 mapPartitions,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40910401/

相关文章:

java - 关于 Grails map 的帮助

java - 如何从运行时加载的类中调用构造函数? java

java - 如何从 ByteBuffer 读取无符号 double 值

android - Scala 内部类 - 不是成员(member)

scala - 创建单一类型对象的列表

apache-spark - 将结构数组扩展为PySpark中的列

java - 使用 CXF 拦截器进行错误处理 - 更改响应消息

scala - 在键上加入 Spark 数据帧

python - PySpark 根据列条件删除重复项

python - Java 网关进程在向驱动程序发送其端口号之前退出