java - 将 iterable 转换为 RDD

标签 java elasticsearch apache-spark spark-streaming elasticsearch-plugin

我想从 Spark 流保存到几个 Elasticsearch 索引。 我创建了 <key(index), value> 对,当我执行 groupByKey 时,结果是 <key(index), Iterable<value>> 的元组但为了使用elasticsearch-spark插件保存到elasticsearch,我需要的值为 JavaRDD<value> .

我知道sparkContext.parallelize(list)有一个选项可以从列表创建JavaRDD,但这只能在驱动程序上执行。

是否还有其他选项来创建可以在执行器上执行的 JavaRDD?或者我可以实现的另一种方式 Tuple2<key(index), JavaRDD<value>>哪个适用于执行者? 如果不是,我怎样才能只在驱动程序上将迭代器切换到JavaRDD,并在执行器上将插件写入elasticsearch?

谢谢

丹妮拉

最佳答案

我想说,必须有可能有像下面这样的东西

JavaPairRDD<Key, Iterable<Value>> pair = ...;
JavaRDD<Iterable<Value>> values = pair.map(t2 -> t2._2());
JavaRDD<Value> onlyValues = values.flatMap(it -> it);

替代方法是

JavaPairRDD<Key, Iterable<Value>> pair = ...;
JavaRDD<Key, Value> keyValues = pair.flatMapValues(v1 -> v1);
JavaRDD<Value> values = keyValues.map(t2 -> t2._2());

关于java - 将 iterable 转换为 RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38823176/

相关文章:

apache-spark - 使用 pyspark 从 S3 读取数据抛出 java.lang.NumberFormatException : For input string: "100M"

java - 三维数组

java - 使用 XStream 中的属性反序列化 XML 文本元素

java - mappedBy 指的是类名还是表名?

php - 使用 php 爬虫索引数据到 ElasticSearch

elasticsearch - 从Kibana移除保存选项

hadoop - HDFS 中的 Avro 架构生成

apache-spark - 如何在转换期间测试数据类型转换

java - 对话框中的 println 带有非拉丁符号

elasticsearch - 从elasticsearch检索多值数组