我想从 Spark 流保存到几个 Elasticsearch 索引。
我创建了 <key(index), value>
对,当我执行 groupByKey 时,结果是 <key(index), Iterable<value>>
的元组但为了使用elasticsearch-spark插件保存到elasticsearch,我需要的值为 JavaRDD<value>
.
我知道sparkContext.parallelize(list)有一个选项可以从列表创建JavaRDD,但这只能在驱动程序上执行。
是否还有其他选项来创建可以在执行器上执行的 JavaRDD?或者我可以实现的另一种方式 Tuple2<key(index), JavaRDD<value>>
哪个适用于执行者?
如果不是,我怎样才能只在驱动程序上将迭代器切换到JavaRDD,并在执行器上将插件写入elasticsearch?
谢谢
丹妮拉
最佳答案
我想说,必须有可能有像下面这样的东西
JavaPairRDD<Key, Iterable<Value>> pair = ...;
JavaRDD<Iterable<Value>> values = pair.map(t2 -> t2._2());
JavaRDD<Value> onlyValues = values.flatMap(it -> it);
替代方法是
JavaPairRDD<Key, Iterable<Value>> pair = ...;
JavaRDD<Key, Value> keyValues = pair.flatMapValues(v1 -> v1);
JavaRDD<Value> values = keyValues.map(t2 -> t2._2());
关于java - 将 iterable 转换为 RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38823176/