mongodb - MongoDB 和 Spark 中的连接过多

标签 mongodb apache-spark apache-kafka spark-streaming

我的 Spark Streaming 应用程序将数据存储在 MongoDB 中。

不幸的是,每个 Spark worker 在将其存储在 MongoDB 中时打开了太多连接

enter image description here

以下是我的代码 Spark - Mongo DB 代码:

public static void main(String[] args) {

    int numThreads = Integer.parseInt(args[3]);
    String mongodbOutputURL = args[4];
    String masterURL = args[5];

    Logger.getLogger("org").setLevel(Level.OFF);
    Logger.getLogger("akka").setLevel(Level.OFF);

//    Create a Spark configuration object to establish connection between the application and spark cluster
    SparkConf sparkConf = new SparkConf().setAppName("AppName").setMaster(masterURL);

    // Configure the Spark microbatch with interval time
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(60*1000));

    Configuration config = new Configuration();
    config.set("mongo.output.uri", "mongodb://host:port/database.collection");

//    Set the topics that should be consumed from Kafka cluster
    Map<String, Integer> topicMap = new HashMap<String, Integer>();
    String[] topics = args[2].split(",");
    for (String topic: topics) {
      topicMap.put(topic, numThreads);
    }

//    Establish the connection between kafka and Spark
    JavaPairReceiverInputDStream<String, String> messages =
            KafkaUtils.createStream(jssc, args[0], args[1], topicMap);

    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
      @Override
      public String call(Tuple2<String, String> tuple2) {
        return tuple2._2();
      }
    });

    JavaPairDStream<Object, BSONObject> save = lines.mapToPair(new PairFunction<String, Object, BSONObject>() {
        @Override
        public Tuple2<Object, BSONObject> call(String input) {
            BSONObject bson = new BasicBSONObject();
            bson.put("field1", input.split(",")[0]);
            bson.put("field2", input.split(",")[1]);
            return new Tuple2<>(null, bson);
        }
    });
    // Store the records in database    
    save.saveAsNewAPIHadoopFiles("prefix","suffix" ,Object.class, Object.class, MongoOutputFormat.class, config);

    jssc.start();
    jssc.awaitTermination();
  }

如何控制每个worker的连接数?

我是否缺少任何配置参数?

更新 1:

我正在使用带有 Java API 的 Spark 1.3。

我无法执行 coalesce() 但我能够执行 repartition(2) 操作。

现在没有连接受到控制。

但我认为连接没有被关闭或没有在 worker 中重用。

请看下面的截图:

流媒体间隔 1 分钟和 2 个分区 enter image description here

最佳答案

您可以尝试映射分区,它在分区级别而不是记录级别工作,即,在一个节点上执行的任务将共享一个数据库连接而不是每条记录。

另外我猜你可以使用预分区(而不是流 RDD)。 Spark 足够聪明,可以利用它来减少随机播放。

关于mongodb - MongoDB 和 Spark 中的连接过多,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34437561/

相关文章:

c# - MongoDB C# - LINQ 包含针对字符串数组抛出 ArgumentException

scala - 分解匹配列

python - 如何添加第三方 Java JAR 文件以在 PySpark 中使用

apache-kafka - Kafka 控制台生产者无法连接到代理

node.js - Mongoose 在引用的文档属性中查找

javascript - Mongoose 按 _id 查找和更新

node.js - Mongoose - 如何从对象 ID 数组中查找集合

python - 将python依赖提交到spark集群

java - Kafka设置从主题读取的最大消息数

python - 如何获取 kafka 主题分区的最新偏移量?