apache-spark - 如何从迭代器创建Spark RDD?

标签 apache-spark spark-streaming

为了清楚起见,我不是从数组/列表中寻找RDD

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7); // sample
JavaRDD<Integer> rdd = new JavaSparkContext().parallelize(list);


如何在不完全将其缓存在内存中的情况下从Java迭代器创建spark RDD?

Iterator<Integer> iterator = Arrays.asList(1, 2, 3, 4).iterator(); //sample iterator for illustration
JavaRDD<Integer> rdd = new JavaSparkContext().what("?", iterator); //the Question


附加问题:

是否要求源可重新读取(或能够多次读取)以提供RDD的弹性?换句话说,由于迭代器基本上是一次读取的,因此甚至有可能从迭代器创建弹性分布式数据集(RDD)吗?

最佳答案

就像其他人说的那样,您可以使用Spark Streaming做些什么,但是就纯Spark而言,您做不到,其原因是您要问的内容与Spark的模型背道而驰。让我解释。
为了分配和并行化工作,spark必须将其分成多个部分。从HDFS读取数据时,HDFS会对Spark进行“分块”操作,因为HDFS文件是按块组织的。 Spark通常每个块生成一个任务。
现在,迭代器仅提供对数据的顺序访问,因此spark无法在不读取内存中所有内容的情况下将其组织成块。

可以构建具有单个可迭代分区的RDD,但是即使这样,也无法确定是否可以将Iterable的实现发送给工作人员。使用sc.parallelize()时,spark将创建实现serializable的分区,以便可以将每个分区发送到不同的工作程序。可迭代可能是通过网络连接,也可能是本地FS中的文件,因此除非将它们缓冲在内存中,否则它们无法发送给工作线程。

关于apache-spark - 如何从迭代器创建Spark RDD?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31072893/

相关文章:

json - 如何在 Spark 2 Scala 中将 Row 转换为 json

scala - Spark UDAF : How to get value from input by column field name in UDAF (User-Defined Aggregation Function)?

scala - spark/scala 在任何列中删除带有 nan 的行

python - 在 PySpark 中使用微秒时间戳

java - Spark UI 的存储选项卡下的 "Size in Memory"显示随着时间的推移,Spark 流的 RAM 使用量增加

java - 未找到类 Spark Streaming 和 kafka

apache-spark - Spark :2. 0.2 java.util.ConcurrentModificationException:KafkaConsumer 对于多线程访问不安全

java - 将 Json 的 Dataset 列解析为 Dataset<Row>

java - Spark 作业中的静态变量-Java

apache-spark - 从缓存中删除 Spark 数据帧