mysql - 将大数据从 MySQL 加载到 Spark 中

标签 mysql apache-spark dataframe

寻找 Spark 理解...

我正在将大量数据从 MySQL 加载到 Spark 中,但它总是死掉:-(

org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:156)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)

这是我的代码

val query =
  s"""
     (
      select 
      mod(act.AccountID, ${parts}) part,
      p.Value name, event.EventTime eventTime, act.AccountID accountID, act.UserGoal goalID,event.ActivityID activityID, id.CountryID countryID, arr.ConsumerID consumerID
      from DimIdentity as id
      join FactArrival as arr on  arr.IdentityID=id.IdentityID
      join FactActivityEvent as event on event.ArrivalID=arr.ArrivalID
      join DimAccount as act on  act.AccountID=event.AccountID
      join DimAccountRoleTypeMatch as role on role.AccountID=act.AccountID
      join DimDateTime as d on event.DateTimeID=d.DateTimeID
      join DimProperty as p on p.PropertyID=event.EventTypeID
      where
        id.Botness=0 and 
        d.DayOfYear>=${from} and d.DayOfYear<${to} and d.Year=${year} and
        (role.AccountRoleTypeID=1 or role.AccountRoleTypeID=2)
  ) a
  """.stripMargin

val events = sqlContext.read.format("jdbc").
  option("url", sqlURL).
  option("driver", "com.mysql.jdbc.Driver").
  option("useUnicode", "true").
  option("zeroDateTimeBehavior", "round").
  option("continueBatchOnError", "true").
  option("useSSL", "false").
  option("dbtable", query).
  option("user", sqlUser).
  option("password", sqlPassword).
  option("partitionColumn", "part").
  option("lowerBound", "0").
  option("upperBound", s"${parts - 1}").
  option("numPartitions", s"${parts}").
  load().as[Activity].toDF

请注意,我正在使用其他答案中建议的partitionColumn、lowerBound、upperBound、numPartitions

我尝试将分区设置为 4 到 512,但它总是死掉。从文件或Mongo中读取相同量的数据没有问题。这是 MySQL 连接器的问题吗?有解决办法吗?

请注意,我发现一个答案建议我避免 Spark,并将查询读入 HDFS 上的文件,然后加载该文件

Multiple Partitions in Spark RDD

这真的是最好的方法吗?

最佳答案

这是我得到的答案...

对我来说,答案是避免 Spark 的 mysql 连接 :-( 我发现很难避免分区引起的崩溃。Mysql 连接需要手动调整分区,并且不会产生任何增加速度。编写将数据读取到大型文本文件中的非 Spark 代码要容易得多,并且在文本文件上调用 Spark。Spark 对于大多数数据源确实很好,但不是 mysql...至少现在还不是

关于mysql - 将大数据从 MySQL 加载到 Spark 中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40132635/

相关文章:

mysql - VB.Net DataSet找不到表

mysql - 单独使用预定义语句来进行占位符绑定(bind)是否有点矫枉过正?

c++ - mysql_stmt_execute 出现奇怪的错误

java - 无法直接从 Spark RDD 向 Ignite 写入/保存数据

python - Pandas ;通过划分大型数据帧的最后一列来创建新列

apache-spark - 访问 Spark 中的嵌套数据

php - 如何在php中以表格形式发送电子邮件?

scala - 如何使用 Junit 4 和 Scala 测试异常?

java - 在数据集上调用 withColumn 的成本是多少

python - DatetimeIndex 阻止 DataFrame 从装饰函数返回