java - 在 Spark Java 中定义要广播的对象的位置

标签 java apache-spark

我有一个数据库对象,用于从所有 Spark 执行器插入数据。当我将此对象定义为static时,它在这些执行器中具有null值。所以我在驱动程序中声明它,广播它,然后在每个执行程序中获取它的值。当我运行该应用程序时,抛出以下异常:

Exception in thread "main" java.io.NotSerializableException: database.Database

注释:

  • 执行器类是可序列化的
  • 广播对象在该类中被定义为 transient
  • 我删除了 transient ,但它不起作用

最佳答案

我这样解释你的问题:

I want to insert data from my RDD from all Spark executors. I tried to create one DB connection on the Driver and pass it somehow as a Broadcast to the executors, but Spark keeps throwing NotSerializableException. How can I achieve my goal?

简短的回答是:

You should create a new connection on every executor node separately.
You should not pass database connection handlers, file handlers and the likes to other processes and especially remote machines.

这里的问题是到底在哪里创建数据库连接,因为执行器数量较多,很容易超出数据库的连接池大小。

您实际上可以做的是使用 foreachPartition ,就像这里:

  // numPartitions == number of simultaneous DB connections you can afford
  yourRdd.repartition(numPartitions)
  .foreachPartition {
    iter =>
      val connection = createConnection()
      while (iter.hasNext) {
        connection.execute("INSERT ...")
      }
      connection.commit()
  }

这里 .foreachPartition 中的代码将在每个执行器机器上执行,并且连接对象不会通过网络发送,不会出现序列化异常并且数据将被插入。

this 的答案中也提到了使用 foreachPartition 的相同推理。问题。

关于java - 在 Spark Java 中定义要广播的对象的位置,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44199403/

相关文章:

java - JVM 中与枚举对象分配相关的困惑

java - 获取在特定线程中执行的方法的名称

apache-spark - Spark BigQuery 连接器 : BaseEncoding$DecodingException: Unrecognized character: 0xa

java - 尝试更新 Access 数据库时出现 NonWritableChannelException

java - 无法写入 JSON : failed to lazily initialize a collection of role: com. Managem.model.Region.pays,无法初始化代理 - 无 session

java - GridBagLayout 中的 JScrollPanes 随机调整大小

scala - 如何在 AWS S3 中保存和使用 Spark History Server 日志

apache-spark - Spark 结构化流应用程序中的死亡执行者

scala - 我们不能在 map 函数中使用 sparkContext 吗?

apache-spark - 具有多个加密 key 提供商的 EMR