mysql - 在spark中将数据保存到MySQL后DataFrame变空

标签 mysql scala apache-spark

我想将数据保存到MySQL中,覆盖某些字段中的重复行,并将待处理数据中不包含的数据保留在MySQL中。我试过Mode.Overwrite/Mode.append,还是不能满足我的需求。所以我尝试从 MySQL 加载现有数据并找到这些行。但是在将数据存入MySQL时,得到的DataFrame变成空的。

在这个过程中,我尝试了两种方法:

  1. 找到pending数据中不存在的数据,然后用UNION将两部分拼接起来。最后使用Mode.Overwrite保存。
  2. 找出pending数据中不存在的数据。使用 Mode.Overwrite 保存挂起的 DataFrame,使用 Mode.append 保存获取的 DF。

这两种方法都不可用。方法一保存或方法二Mode.OverWrite保存后得到的DF一直为空。

代码如下:

var mysql_table = spark.sqlContext.read.format("jdbc").options(jdbc_options).load()    
val list = pre_res.select("clientMacAddr").rdd.map(x => x.toString.substring(1,18)).collect()    
val rec_diff = mysql_table.filter(x => !(list.contains(x.apply(0).toString)))  
pre_res.write.mode("overwrite").format("jdbc").options(jdbc_options).save()
rec_diff.show()
rec_diff.write.mode("append").format("jdbc").options(jdbc_options).save()

结果是这样的:

+----------------+----+

|clientMacAddr|var1|

+----------------+----+

+----------------+----+

谢谢。

最佳答案

你的结果是空的,因为 spark 是惰性的。在您将数据收集到驱动程序(减少、计数、收集、显示...)或将数据写入磁盘(写入、保存...)之前,它不会执行任何操作。

因此,当您调用 rec_diff.show() 时,您的 mysql 表只会被读取并与 pre_res 进行比较。那时你已经将 pre_res 写入 mysql 表,因此 pre_res 包含与你的 mysql 表相同的数据,这导致差异为空。

在覆盖 mysql 表(反转代码的第 4 行和第 5 行)之前尝试显示(或收集或写入)您的差异,您会看到差异。

跟进:

这意味着用 spark 覆盖您的输入是个坏主意。原因很简单,spark 是懒惰的(永远记住这一点),在你写东西之前不会读取任何东西。那时 spark 将删除文件以用您的数据替换它并开始读取...您刚刚删除的文件。这一切背后的真正原因是 spark 旨在处理比任何内存都大得多的数据集。因此,它旨在以小批量(执行程序任务)读取和处理您的数据,并逐步写入结果,这与覆盖输入不兼容。

您需要做的是将数据写入临时文件(例如 hdfs parquet 会非常高效)。请注意,有一个类似的线程 here .您尝试执行的操作将编码如下:

var mysql_table = spark.sqlContext.read.format("jdbc").options(jdbc_options).load()    
val list = pre_res.select("clientMacAddr").rdd.map(x => x.toString.substring(1,18)).collect()    
val rec_diff = mysql_table.filter(x => !(list.contains(x.apply(0).toString)))
rec_diff.write.parquet("somewhere")
val saved_rec_diff = spark.sqlContext.read.parquet("somewhere")
saved_rec_diff.show()
saved_rec_diff.write.mode("append").format("jdbc").options(jdbc_options).save()

关于mysql - 在spark中将数据保存到MySQL后DataFrame变空,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47151255/

相关文章:

mysql从另一个表中选择一行的多列

MySql 存储过程的参数与影响的列同名,这可能吗?

mysql - APIMan 与 Tomcat 和 MySQL

scala - 如何使用 Mockito 在 Scala 对象中模拟函数?

python - 如何在scala中实现Python的norm.expect

scala - 使用hadoop parquet处理大数据到CSV输出

python - 用 DataFrame 中的 None/null 值替换空字符串

MYSQL 奇怪的语法错误,带/不带分号

scala - 在scala中动态创建变量

linux - Apache Zeppelin 配置与 Spark