java - java中的spark流与mongodb

标签 java mongodb apache-spark spark-streaming

在我的应用程序中,我想将数据从 MongoDB 流式传输到 Java 中的 Spark Streaming。为此,我使用了队列流,因为我认为我可以将 mongodb 数据保留在 rdd 上。但这个方法不起作用或者我做错了什么。

有人从 mongodb 流式传输到 Spark Streaming 吗?是不是我的方法不对,如果是的话,正确的方法是什么?

我的代码在这里

package com.mongodb.spark.stream;

import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Queue;

import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.bson.BSONObject;

import com.mongodb.hadoop.MongoInputFormat;

import scala.Tuple2;

public class MongoStream {

public static void main(String[] args) {

    Configuration conf = new Configuration();
    conf.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat");
    conf.set("mongo.input.uri", "mongodb://192.168.1.149:27017/test.observations");

    SparkConf spc = new SparkConf().setMaster("local[2]").setAppName("mongo");

    JavaStreamingContext sc = new JavaStreamingContext(spc, Durations.seconds(1));

    final Queue q = new LinkedList<JavaRDD<String>>();

    final JavaPairRDD<Object, BSONObject> rdd = sc.sparkContext().newAPIHadoopRDD(conf, MongoInputFormat.class,
            Object.class, BSONObject.class);

    JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<Tuple2<Object, BSONObject>, String>() {

        private static final long serialVersionUID = -5974149698144908239L;

        @Override
        public Iterable<String> call(Tuple2<Object, BSONObject> arg0) throws Exception {

            Object o = arg0._2.get("SensorId").toString();
            if (o instanceof String) {
                String str = (String) o;
                str = str.replace("[.,!?|\n]", " ");
                System.out.println(str);

                q.add(str.split(""));
                System.out.println("dsdssd : " + q);
                return Arrays.asList(str.split(" "));

            } else
                return Collections.emptyList();

        }
    });

    @SuppressWarnings("unchecked")
    JavaReceiverInputDStream<String> rec = (JavaReceiverInputDStream<String>) sc.queueStream(q);

}

}

最佳答案

这个方法行不通。 QueueDStream 在每个时间间隔内消耗给定队列中的一个 RDD,因此该过程只会运行一次。

你可以像这样想象它:

QueueDStream(Queue(RDD-time1, RDD-time2, ..., RDD-time-n))

然后在每个流间隔,将处理队列头部的 RDD 实例。如果您使用可变并发队列,进程可能会在队列尾部添加 RDD,而 Spark Streaming 将在每个时间间隔处理头部。

我不知道 Mongo 的流支持,所以您可能需要以其他方式重新考虑这个过程。例如创建您自己的 Custom Receiver

关于java - java中的spark流与mongodb,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31858460/

相关文章:

java - 无法将 Spring Data MongoDB + Spring DataJPA 与 SpringBoot 一起使用

java - Struts 表单为空

java - 无法将 CryptoPermission 添加到 java.policy 文件

javascript - 如何使用 MongoDB 更改数组的顺序?

java - Spark 因 SerializedLambda 的 ClassNotFoundException 失败

scala - SparkSQL时间戳查询失败

scala - 如何从余弦相似度矩阵中获取项目ID?

java - handleStopActivity 中的 NullPointerException -- 在堆栈跟踪中没有引用我的代码

Java资源-游戏模拟

Java/MongoDB searchQuery 更改日期