我有一个数据库对象,用于从所有 Spark 执行器插入数据。当我将此对象定义为static
时,它在这些执行器中具有null
值。所以我在驱动程序中声明它,广播它,然后在每个执行程序中获取它的值。当我运行该应用程序时,抛出以下异常:
Exception in thread "main" java.io.NotSerializableException: database.Database
注释:
- 执行器类是可序列化的
- 广播对象在该类中被定义为 transient
- 我删除了 transient ,但它不起作用
最佳答案
我这样解释你的问题:
I want to insert data from my RDD from all Spark executors. I tried to create one DB connection on the Driver and pass it somehow as a Broadcast to the executors, but Spark keeps throwing
NotSerializableException
. How can I achieve my goal?
简短的回答是:
You should create a new connection on every executor node separately.
You should not pass database connection handlers, file handlers and the likes to other processes and especially remote machines.
这里的问题是到底在哪里创建数据库连接,因为执行器数量较多,很容易超出数据库的连接池大小。
您实际上可以做的是使用 foreachPartition ,就像这里:
// numPartitions == number of simultaneous DB connections you can afford
yourRdd.repartition(numPartitions)
.foreachPartition {
iter =>
val connection = createConnection()
while (iter.hasNext) {
connection.execute("INSERT ...")
}
connection.commit()
}
这里 .foreachPartition
中的代码将在每个执行器机器上执行,并且连接对象不会通过网络发送,不会出现序列化异常并且数据将被插入。
在 this 的答案中也提到了使用 foreachPartition
的相同推理。问题。
关于java - 在 Spark Java 中定义要广播的对象的位置,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44199403/