apache-spark - 将 MySQL 表转换为 Parquet 时出现 Spark 异常

标签 apache-spark apache-spark-sql parquet

我正在尝试使用 spark 1.6.2 将 MySQL 远程表转换为 Parquet 文件。

该过程运行 10 分钟,填满内存,然后从以下消息开始:

WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(driver,[Lscala.Tuple2;@dac44da,BlockManagerId(driver, localhost, 46158))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval

最后失败并出现此错误:
ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriverActorSystem-scheduler-1] shutting down ActorSystem [sparkDriverActorSystem]
java.lang.OutOfMemoryError: GC overhead limit exceeded

我正在使用以下命令在 spark-shell 中运行它:
spark-shell --packages mysql:mysql-connector-java:5.1.26 org.slf4j:slf4j-simple:1.7.21 --driver-memory 12G

val dataframe_mysql = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://.../table").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "...").option("user", "...").option("password", "...").load()

dataframe_mysql.saveAsParquetFile("name.parquet")

我将最大执行程序内存限制为 12G。有没有办法强制将 Parquet 文件写入“小”块以释放内存?

最佳答案

问题似乎是您在使用 jdbc 连接器读取数据时没有定义分区。

默认情况下,从 JDBC 读取不是分布式的,因此要启用分布式,您必须设置手动分区。您需要一个作为良好分区键的列,并且您必须预先了解分布情况。

这显然是您的数据的样子:

root 
|-- id: long (nullable = false) 
|-- order_year: string (nullable = false) 
|-- order_number: string (nullable = false) 
|-- row_number: integer (nullable = false) 
|-- product_code: string (nullable = false) 
|-- name: string (nullable = false) 
|-- quantity: integer (nullable = false) 
|-- price: double (nullable = false) 
|-- price_vat: double (nullable = false) 
|-- created_at: timestamp (nullable = true) 
|-- updated_at: timestamp (nullable = true)
order_year对我来说似乎是一个很好的候选人。 (根据您的评论,您似乎有大约 20 年的时间)
import org.apache.spark.sql.SQLContext

val sqlContext: SQLContext = ???

val driver: String = ???
val connectionUrl: String = ???
val query: String = ???
val userName: String = ???
val password: String = ???

// Manual partitioning
val partitionColumn: String = "order_year"

val options: Map[String, String] = Map("driver" -> driver,
  "url" -> connectionUrl,
  "dbtable" -> query,
  "user" -> userName,
  "password" -> password,
  "partitionColumn" -> partitionColumn,
  "lowerBound" -> "0",
  "upperBound" -> "3000",
  "numPartitions" -> "300"
)

val df = sqlContext.read.format("jdbc").options(options).load()

PS: partitionColumn , lowerBound , upperBound , numPartitions :
如果指定了其中任何选项,则必须全部指定这些选项。

现在您可以保存您的 DataFrame Parquet 。

关于apache-spark - 将 MySQL 表转换为 Parquet 时出现 Spark 异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40290478/

相关文章:

apache-spark - org.apache.zeppelin.interpreter.InterpreterException:Sparkr没有响应

postgresql - Py4JJavaError java.lang.NullPointerException org.apache.spark.sql.DataFrameWriter.jdbc

postgresql - 为什么 Parquet 而不是像 Postgres 这样的 RDBMS

java - 从 Scala 到 Java 1.8

scala - Spark DataFrame groupBy

performance - DataFrame/Dataset groupBy 行为/优化

java - PySpark 中不存在方法 showString([class java.lang.Integer, class java.lang.Integer, class java.lang.Boolean])

hadoop - PyArrow 中的 hdfs.connect() 与 HdfsClient

scala - 无法将有序数据写入 Spark 中的 Parquet

apache-spark - 奇怪的性能问题 Spark LSH MinHash approxSimilarityJoin