sql-server - Spark 批量插入到 MS-SQL

标签 sql-server scala apache-spark hadoop bulkinsert

我在 Hive 上有一个表,其中包含 920 649 653 条记录。 我想将该表插入到 MS-SQL 中。

我使用 azure-sqldb-spark图书馆。

spark2-shell --master=yarn --jars azure-sqldb-spark-1.0.2-jar-with-dependencies.jar

import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

var bulkCopyMetadata = new BulkCopyMetadata
bulkCopyMetadata.addColumnMetadata(1, "Id_Util_Donnees_Texte", java.sql.Types.INTEGER, 0, 0)
bulkCopyMetadata.addColumnMetadata(2, "Id_Util", java.sql.Types.INTEGER, 0, 0)
bulkCopyMetadata.addColumnMetadata(3, "Id_From", java.sql.Types.INTEGER, 0, 0)
bulkCopyMetadata.addColumnMetadata(4, "IdD", java.sql.Types.INTEGER, 0, 0)
bulkCopyMetadata.addColumnMetadata(5, "Valeur", java.sql.Types.NVARCHAR, 8000, 0)
bulkCopyMetadata.addColumnMetadata(6, "dCollecte", java.sql.Types.TIMESTAMP, 0, 0)
bulkCopyMetadata.addColumnMetadata(7, "dInsertion", java.sql.Types.TIMESTAMP, 0, 0)
bulkCopyMetadata.addColumnMetadata(8, "dMAJ", java.sql.Types.TIMESTAMP, 0, 0)
bulkCopyMetadata.addColumnMetadata(9, "id_marque", java.sql.Types.INTEGER, 0, 0)


val df = spark.table("analyses_tmp.import_sofinco_Util_last_Donnees_Texte").coalesce(10)


val bulkCopyConfig = Config(Map(
  "url"               -> "db_url",
  "user"              -> "user",
  "password"          -> "*******",
  "databaseName"      -> "Hadoop",
  "dbTable"           -> "dbo.Util_Last_Donnees_Texte",
  "bulkCopyBatchSize" -> "4000",
  "bulkCopyTableLock" -> "false",
  "bulkCopyTimeout"   -> "6000"
))

df.bulkCopyToSqlDB(bulkCopyConfig, bulkCopyMetadata)

2 小时后,我收到此错误,整个插入回滚:

9/04/11 20:17:22 ERROR cluster.YarnClientSchedulerBackend: Yarn application has already exited with state FINISHED!
org.apache.spark.SparkException: Job 1 cancelled because SparkContext was shut down
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:837)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:835)
  at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
  at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:835)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1848)
  at org.apache.spark.util.EventLoop.stop(EventLoop.scala:83)
  at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1761)
  at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1931)
  at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1361)
  at org.apache.spark.SparkContext.stop(SparkContext.scala:1930)
  at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:106)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:929)
  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:927)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:927)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply$mcV$sp(Dataset.scala:2675)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2675)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2675)
  at org.apache.spark.sql.Dataset$$anonfun$withNewRDDExecutionId$1.apply(Dataset.scala:3239)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
  at org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:3235)
  at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2674)
  at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions.bulkCopyToSqlDB(DataFrameFunctions.scala:72)
  ... 51 elided

您是否看到发生此错误的任何原因? 你对我的设置有什么建议吗?我应该改变什么来加快这个过程吗?

最佳答案

您可以通过在 yarn-site.xml 中设置以下属性来强制 YARN 忽略它

<property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
</property>

<property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
</property>

希望这对您有所帮助。

谢谢。

关于sql-server - Spark 批量插入到 MS-SQL,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55646811/

相关文章:

scala - servlet 代码中类型信息丢失

apache-spark - 无法在 Amazon Keyspaces 上的表上写入

scala - sbt publish-local后如何引用jar文件

sql-server - Excel powerpivot更新错误 "object was not found in the cube"

sql-server - 更改 Excel 中的默认数据库连接

c# - 在sql server中1列中使用2个参数更新多行

scala - sbt 究竟是如何计算出任务名称的?

scala - 字符串模式匹配最佳实践

python - Json 文件到 pyspark 数据帧

sql - 如何在 SQL 中选择 XML Path 查询的输出?