java - 如何将时间戳附加到 rdd 并推送到 elasticsearch

标签 java elasticsearch time apache-kafka spark-streaming

我是 spark streaming 和 elasticsearch 的新手,我正在尝试使用 spark 从 kafka 主题读取数据并将数据存储为 rdd。在 rdd 中我想附加时间戳,一旦新数据到来然后推送到 elasticsearch。

lines.foreachRDD(rdd -> {
        if(!rdd.isEmpty()){
        // rdd.collect().forEach(System.out::println);
        String timeStamp = new 
        SimpleDateFormat("yyyy::MM::dd::HH::mm::ss").format(new Date());
        List<String> myList = new ArrayList<String>(Arrays.asList(timeStamp.split("\\s+")));
        List<String> f = rdd.collect();


        Map<List<String>, ?> rddMaps = ImmutableMap.of(f, 1);
        Map<List<String>, ?> myListrdd = ImmutableMap.of(myList, 1);

        JavaRDD<Map<List<String>, ?>> javaRDD = sc.parallelize(ImmutableList.of(rddMaps));

        JavaEsSpark.saveToEs(javaRDD, "sample/docs");
        }
    });

最佳答案

Spark ?

据我了解,spark streaming 用于实时流数据计算,如 mapreducejoin窗口。好像没必要用这么强大的工具,我们需要的只是给事件加个时间戳。

Logstash?

如果是这种情况,Logstash 可能更适合我们的情况。 kafka and ES

Logstash 会在事件到来时记录时间戳,它还有persistent queue。和 Dead Letter Queues确保数据弹性。原生支持向ES推送数据(毕竟属于系列产品),推送数据非常方便。

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "logstash-%{type}-%{+YYYY.MM.dd}"
  }
}

更多

  • 有关 Logstash 的更多信息,here是介绍。
  • here是一个示例 logstash 配置文件。

希望对您有所帮助。

引用

关于java - 如何将时间戳附加到 rdd 并推送到 elasticsearch,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46945441/

相关文章:

java - 使用 JSON for OData 时不会返回导航链接(使用 Apache Olingo for Java)

elasticsearch - 在Elastic Search中有选择地关闭停用词

elasticsearch - Analyze API不适用于Elasticsearch 1.7

在c中检查时间

java - Android RecyclerView 内的relativelayout 无法正确滚动

java - Minecraft 插件游戏模式转换器 GUI

java - NFC 权限错误

elasticsearch - Logstash 日志被读取但不会推送到elasticsearch

javascript - 保存/编辑范围时显示当前日期(angularjs)

ruby - 使用正则表达式将 12 小时制转换为 24 小时制