elasticsearch - 从Kafka到Spark的流式传输到 Elasticsearch 索引

标签 elasticsearch apache-spark apache-kafka spark-streaming

我正在尝试使用Spark Streaming将Kafka输入索引到elasticsearch中。

kafka中的消息是这样的:

“汤姆34快乐巴黎”

我想在Spark Streaming中定义结构,以便在elasticsearch中索引此消息:

{名称:“汤姆”,
年龄:34岁
状态:“快乐”,
城市:“巴黎}

我已经阅读了有关RDD转换的信息,但是找不到如何定义值的键。

我需要你的帮助。

在我的代码下面,该代码仅对从Kafka收到的消息进行字数统计:

package com.examples

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._


object MainExample {

def main(arg: Array[String]) {

var logger = Logger.getLogger(this.getClass())

val jobName = "MainExample"

val conf = new SparkConf().setAppName(jobName)    
val ssc = new StreamingContext(conf, Seconds(2))

val zkQuorum = "localhost:2181"
val group = ""
val topics = "test"
val numThreads = 1

val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val lineMap = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
val lines = lineMap.map(_._2)
val words = lines.flatMap(_.split(" "))
val pair = words.map( x => (x,1))
val wordCounts = pair.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()

ssc.start()
ssc.awaitTermination()
 }
}

最佳答案

JavaRDD<Map<String,String>> result = input.map(new Function<String, Map<String, String>>() {
        @Override
        public Map<String, String> call(String v1) throws Exception {
            Map<String, String> ret = new HashMap<>();
            int i=0;
            for(String val : v1.split(" ")){
                ret.put("key"+i++, val);
            }
            return ret;
        }           
    });

关于elasticsearch - 从Kafka到Spark的流式传输到 Elasticsearch 索引,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35302005/

相关文章:

python - Django Haystack Elasticsearch : order by position of matched term

hadoop - Spark - 如何按键计算记录数

apache-kafka - 使用高级 API 从特定偏移量开始读取 kafka 消息

java - 卡夫卡 : disable create topic from Java

Elasticsearch 枚举字段

java - ElasticSearch 和 Apache HttpAsyncClient

apache-spark - 调用 o67.load : java. lang.NoClassDefFoundError: org/apache/hadoop/fs/staging/StagingDirectoryCapable 时出错

apache-kafka - 我正在评估 Google Pub/Sub 与 Kafka。有什么区别?

elasticsearch - 如何在Elasticsearch上省略空格

hadoop - 使用pyspark从hdfs读取文件时拒绝连接