在我的应用程序中,我想将数据从 MongoDB 流式传输到 Java 中的 Spark Streaming。为此,我使用了队列流,因为我认为我可以将 mongodb 数据保留在 rdd 上。但这个方法不起作用或者我做错了什么。
有人从 mongodb 流式传输到 Spark Streaming 吗?是不是我的方法不对,如果是的话,正确的方法是什么?
我的代码在这里
package com.mongodb.spark.stream;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Queue;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.bson.BSONObject;
import com.mongodb.hadoop.MongoInputFormat;
import scala.Tuple2;
public class MongoStream {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat");
conf.set("mongo.input.uri", "mongodb://192.168.1.149:27017/test.observations");
SparkConf spc = new SparkConf().setMaster("local[2]").setAppName("mongo");
JavaStreamingContext sc = new JavaStreamingContext(spc, Durations.seconds(1));
final Queue q = new LinkedList<JavaRDD<String>>();
final JavaPairRDD<Object, BSONObject> rdd = sc.sparkContext().newAPIHadoopRDD(conf, MongoInputFormat.class,
Object.class, BSONObject.class);
JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<Tuple2<Object, BSONObject>, String>() {
private static final long serialVersionUID = -5974149698144908239L;
@Override
public Iterable<String> call(Tuple2<Object, BSONObject> arg0) throws Exception {
Object o = arg0._2.get("SensorId").toString();
if (o instanceof String) {
String str = (String) o;
str = str.replace("[.,!?|\n]", " ");
System.out.println(str);
q.add(str.split(""));
System.out.println("dsdssd : " + q);
return Arrays.asList(str.split(" "));
} else
return Collections.emptyList();
}
});
@SuppressWarnings("unchecked")
JavaReceiverInputDStream<String> rec = (JavaReceiverInputDStream<String>) sc.queueStream(q);
}
}
最佳答案
这个方法行不通。 QueueDStream
在每个时间间隔内消耗给定队列中的一个 RDD,因此该过程只会运行一次。
你可以像这样想象它:
QueueDStream(Queue(RDD-time1, RDD-time2, ..., RDD-time-n))
然后在每个流间隔,将处理队列头部的 RDD 实例。如果您使用可变并发队列,进程可能会在队列尾部添加 RDD,而 Spark Streaming 将在每个时间间隔处理头部。
我不知道 Mongo 的流支持,所以您可能需要以其他方式重新考虑这个过程。例如创建您自己的 Custom Receiver
关于java - java中的spark流与mongodb,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31858460/