我是 spark streaming 和 elasticsearch 的新手,我正在尝试使用 spark 从 kafka 主题读取数据并将数据存储为 rdd。在 rdd 中我想附加时间戳,一旦新数据到来然后推送到 elasticsearch。
lines.foreachRDD(rdd -> {
if(!rdd.isEmpty()){
// rdd.collect().forEach(System.out::println);
String timeStamp = new
SimpleDateFormat("yyyy::MM::dd::HH::mm::ss").format(new Date());
List<String> myList = new ArrayList<String>(Arrays.asList(timeStamp.split("\\s+")));
List<String> f = rdd.collect();
Map<List<String>, ?> rddMaps = ImmutableMap.of(f, 1);
Map<List<String>, ?> myListrdd = ImmutableMap.of(myList, 1);
JavaRDD<Map<List<String>, ?>> javaRDD = sc.parallelize(ImmutableList.of(rddMaps));
JavaEsSpark.saveToEs(javaRDD, "sample/docs");
}
});
最佳答案
Spark ?
据我了解,spark streaming 用于实时流数据计算,如 map
、reduce
、join
和 窗口
。好像没必要用这么强大的工具,我们需要的只是给事件加个时间戳。
Logstash?
Logstash 会在事件到来时记录时间戳,它还有persistent queue。和 Dead Letter Queues确保数据弹性。原生支持向ES推送数据(毕竟属于系列产品),推送数据非常方便。
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "logstash-%{type}-%{+YYYY.MM.dd}"
}
}
更多
希望对您有所帮助。
引用
关于java - 如何将时间戳附加到 rdd 并推送到 elasticsearch,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46945441/