java - 在 Spark Streaming 作业中启动一次 JDBC 连接

标签 java apache-spark spark-streaming hazelcast

环境

  • 从 Kafka 读取 Spark Streaming 作业,微批量大小 30 秒 (Durations.seconds (30))
  • 具有引用状态的内存存储 (Hazelcast)。这是一个非静态状态,由 Spark Workers 实时更新
  • Spark Workers 与 Hazelcast 连接

当前方法 - 使用foreachRDD对数据进行操作,并为每个微批处理(RDD)建立连接。这种情况每 30 秒发生一次 (Durations.seconds (30))。

kafkaStream.foreachRDD (new VoidFunction<JavaRDD<String>>() {
    @Override
    public void call(JavaRDD<String> microBatch) throws Exception {
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.addAddress("myHost:5701");    //Define connection
        HazelcastInstance client = HazelcastClient.newHazelcastClient(clientConfig);
        //Do processing
   }
}

询问:希望在每个 Spark Worker 上打开一次连接(提交作业时),而不是为每个微批处理打开新连接。实现这一目标的正确方法是什么?

最佳答案

这里详细解释了您需要什么: https://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

在 foreachPartition 中,主体在执行器中本地执行。在那里您可以拥有静态客户端连接(例如,每个工作人员将使用自己的静态对象)

希望对您有所帮助。

谢谢

关于java - 在 Spark Streaming 作业中启动一次 JDBC 连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41505471/

相关文章:

java - DateTimeFormatter 是否可以在不捕获异常的情况下验证日期字符串?

java - 具有值(等于)相等性的 SoftReference 的潜在用途

scala - 持久化 Spark 数据框

apache-spark - Pyspark : Custom window function

apache-spark - 如何在集群模式下提供依赖 jars 来触发提交

java - 查找 DataFrame 中分组数据的比率

java - Cypher - 通过属性中的字符串聚合节点

scala - 如何将 RDD[(Key, Value)] 转换为 Map[Key, RDD[Value]]

scala - Apache Spark和域驱动设计

java - 如何只使用IP和端口访问我的servlet?