您好,我是 Apache Spark 新手。我正在学习的路上。 我已经为来自 kafka 主题的 json 数据编写了 Spark Streaming。下面是连续传输的 json 数据。 但现在我不知道如何处理这个 json 数据。我使用 DataSet 和 DataFrame 来处理 Json 数据,但遇到了一些错误。 请帮我举几个例子,我如何处理从流中传输的数据。
注意:我使用的是 Apache Spark 1.6.3 版本。
(null{"time":"2017/08/21 18:25:11","model":"20E84fb","speed":"20E84fb","cellId":"0605d822E84fb","course":"146.37E84fb","header":"ST600ALTE84fb","deviceId":206675884,"distance":"166E84fb","longitude":"-099.168493E84fb","latitude":"19.428616E84fb","payload":"ST600ALT+number+;206675884;20;376;20161005;16:26:59;0605d822;334;20;2ee5;63;+19.428616;-099.168493;000.213;146.37;6;1;166;12.21;000000;34;000887;4.4;1;0.00E84fb","date":"2017/08/21 18:25:11E84fb"})
代码:
package datapipeline;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.onosproject.net.Device;
import scala.Tuple2;
public final class SparkConsumer {
//private static SparkContext sc = new SparkContext();
private static final Pattern SPACE = Pattern.compile(" ");
private static void setLogLevels() {
boolean log4jInitialized = Logger.getRootLogger().getAllAppenders().hasMoreElements();
if (!log4jInitialized) {
// We first log something to initialize Spark's default logging, then we override the
// logging level.
Logger.getLogger(SparkConsumer.class).info("Setting log level to [WARN] for streaming example." +
" To override add a custom log4j.properties to the classpath.");
Logger.getRootLogger().setLevel(Level.WARN);
}
}
public static void main(String[] args) throws Exception {
String jars[]={"C:\\DeviceStreaming-1.0.0.jar"};
setLogLevels();
SparkConf sparkConf = new SparkConf().setAppName("CustomerKafkaConsumerThread")
.set("spark.local.ip","localhost:9092")
.setMaster("local[*]").setJars(jars);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(3000));
JavaSparkContext ctx = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate(sparkConf));
SQLContext sqlContext = new SQLContext(ctx);
Map<String, Integer> topicMap = new HashMap<>();
topicMap.put("iot", 10);
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc,"localhost:2181","SparkConsumer", topicMap,StorageLevel.MEMORY_ONLY());
messages.print();
JavaDStream<String> json = messages.map(
new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> message) {
return message._2();
}
}
);
json.foreachRDD(rdd -> {
//DataFrame df = sqlContext.read().json(rdd);
DataFrame df=sqlContext.createDataFrame(rdd, Device.class);
df.registerTempTable("rdd");
df.filter("cellId");
/*DataFrame deviceFrame= sqlContext.sql("SELECT cellID FROM rdd where cellId=206675884");
df.show();
df.printSchema();
List<String> list= deviceFrame.javaRDD().map(row -> row.getString(0)).collect();*/
});
jssc.start();
jssc.awaitTermination();
}
}
最佳答案
您可以使用 get_json_object 函数从 JSON 中提取数据。
Extracts json object from a json string based on json path specified, and returns json string of the extracted json object. It will return null if the input json string is invalid.
尝试如下:
df.withCoulmn("已解析",..functions.from_json(new Column("raw_json))).printSchema
这应该为您提供它生成的架构,然后您可以根据需要执行操作。
编辑: 也许这不是 1.6.3 的最佳解决方案,也许您需要 UDF 对解析的对象执行操作
关于java - 如何在java中处理来自Apache Spark Streaming的Json数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45840208/