mysql - Spark 中的 mysql 查询序列

标签 mysql apache-spark

我在 Spark 中有一个要求,我需要从 mysql 实例中获取数据,并在经过一些处理后使用来自不同 mysql 数据库的更多数据来丰富它们。

但是,当我尝试从 map 函数内部再次访问数据库时,我得到一个

org.apache.spark.SparkException: Task not serializable
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)     
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
org.apache.spark.SparkContext.clean(SparkContext.scala:2094)

我的代码是这样的:

val reader = sqlContext.read;
initialDataset.map( r => reader.jdbc(jdbcUrl, "(select enrichment_data from other_table where id='${r.getString(1)'}) result", connectionProperties).rdd.first().get(0).toString )

有什么想法/建议吗?我应该使用两个不同的数据集吗?谢谢!

最佳答案

首先,map() 函数应该接受来自现有 RDD 的一行,然后应用您所做的更改并返回更新后的行。这就是你得到这个异常的原因,因为 scala 无法序列化代码 reader.jdbc(jdbcUrl, ...

要解决您的问题,您可以根据需要有多种选择:

  1. 您可以在收集这些数据集后广播其中一个数据集。通过广播,您的数据集将存储到节点的内存中。如果这个数据集相当小以适合节点的内存,这可能会起作用。然后你可以查询它并将结果与​​第二个数据集组合

  2. 如果两个数据集都很大并且不适合将它们加载到节点内存中,那么请使用 mapPartition,您可以找到有关 mapPartition 的更多信息 here . mapPartition 是按分区调用的,而不是像 map() 那样按元素调用的。如果选择此选项,则可以从 mapPartition 访问第二个数据集,甚至可以从 mapPartition 初始化整个数据集(例如,从第二个数据库中检索所有相关记录)。

请注意,我假设这两个数据集确实具有某种依赖性(例如,在执行下一步之前,您需要从第二个数据库访问某些值)。如果他们不这样做,那么只需创建 ds1 和 ds2 并像处理任何数据集一样正常使用它们。最后,如果您确定可能需要多次访问数据集,请记住缓存数据集。

祝你好运

关于mysql - Spark 中的 mysql 查询序列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49231944/

相关文章:

java - 为什么阿拉伯字母没有插入数据库?

mysql - 在mysql查询中使用两个内连接

java - Jersey + Spark javax.ws.rs.core.UriBuilder.uri

apache-spark - persist(DISK_ONLY) 与手动保存到 HDFS 和回读之间的区别

python - 我需要在 Spark 集群的每个节点上安装 Koalas 还是只在主节点上安装?

mysql - 如何在 sql 脚本中而不是在存储过程中选择性地执行某些 sql 语句

PHP, MySQL 查询

ubuntu - Apache Spark : "failed to launch org.apache.spark.deploy.worker.Worker" or Master

java - 为什么插入照片时数据库中的值为空?

python - pyspark MLUtils saveaslibsvm 仅保存在 _temporary 下,而不保存在 master 上