scala - Apache Spark Scala - Hive 插入到抛出 "too large frame error"

标签 scala apache-spark hadoop hive apache-spark-sql

我正在尝试使用下面的代码将数据插入到 Hive 中,但它总是失败

java.lang.IllegalArgumentException: Too large frame:'.

我尝试调整内存但没有帮助。

这里是详细信息。

错误堆栈跟踪:

[Stage 4:=====================================================>(999 + 1) / 1000]18/11/27 09:59:44 WARN TaskSetManager: Lost task 364.0 in stage 4.0 (TID 1367, spark-node, executor 1): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:328)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:159)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:159)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.shuffle.FetchFailedException: Too large frame: 5587345928
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:286)
... 8 more
Caused by: java.lang.IllegalArgumentException: Too large frame: 5587345928
at org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:133)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:81)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
... 1 more

下面是我的示例代码:

 //create spark session first
    val spark = SparkSession.builder()
      .appName("MSSQLIngestion")
      .master("yarn")
      .config("spark.sql.caseSensitive", "false")
      .config("spark.sql.shuffle.partitions", "1000")
      .config("spark.shuffle.spill", "true")
      .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("hive.exec.dynamic.partition", "true")
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
      .enableHiveSupport()
      .getOrCreate();

    spark.sql("set hive.exec.parallel=true")

    // Create a Properties() object to hold the parameters.
    val connectionProperties = new Properties()
    connectionProperties.setProperty("Driver", driverClass)
    connectionProperties.setProperty("fetchSize", "100000")

    // read data from JDBC server and construct a dataframe
    val jdbcDF1 = spark.read.jdbc(url = jdbcUrl, table = (select * from jdbcTable) e, properties = connectionProperties)

    val jdbcDF = jdbcDF1.repartition(1000)

    val count = jdbcDF.count()

    println("red "+count+" records from sql server and started loading into hive")

    // if count > 0 then insert the records into Hive
    if (count > 0) {
      // create spark temporary table
      jdbcDF.createOrReplaceTempView("sparkTempTable")
      // insert into Hive external table
      spark.sql("insert into externalTable partition (hivePartitionCol) select * from sparkTempTable  distribute by  hivePartitionCol ")
    }
    println("completed the job for loading the data into hive")

    spark.stop()

这是我的 Spark 提交:

spark-submit --class com.generic.MSSQLHiveIngestion --master yarn --num-executors 8 --executor-cores 2 --executor-memory 16G --driver-memory 8G --driver-cores 4 --conf spark.yarn.executor.memoryOverhead=1G data-ingestion.jar

最佳答案

当shuffle数据 block 的大小超过2GB的限制,spark无法处理,会出现如下错误。

    Caused by: java.lang.IllegalArgumentException: Too large frame: 5211883372140375593
            at org.sparkproject.guava.base.Preconditions.checkArgument(Preconditions.java:119)
            at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:148)

选项 1(>= Spark2.2):

  1. >= Spark2.2与其他较低版本的 Spark 相比,它以更好的方式处理了这个问题。有关信息,请参阅 SPARK-19659 .

  2. 使用以下 Spark 配置: 修改spark.sql.shuffle.partitions的值来自默认 200大于 2001 的值. 设置 spark.default.parallelism 的值与 spark.sql.shuffle.partitions 相同的值.

选项 2:

  1. 确定导致问题的 DataFrame。
    A。在创建新的 DataFrame 后添加一个 Spark 操作(例如 df.count())。
    b.打印任何内容以检查 DataFrame。
    C。如果未针对 DataFrame 执行打印语句,则问题出在该 DataFrame 上。

  2. 识别DataFrame后,使用df.repartition()对DataFrame重新分区然后使用 df.cache() 缓存它.

  3. 如果数据存在偏斜,并且您使用的 Spark 版本早于 2.2,则修改代码。

关于scala - Apache Spark Scala - Hive 插入到抛出 "too large frame error",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53503668/

相关文章:

hadoop - Pig 中的 SUM、AVG 不起作用

java - 如何在Scala中使用JCache?我收到编译器类型错误 : found String required K

scala - 使用 Scala Slick 实现 DRY 的高阶函数

Scala REPL 在 Ubuntu 上不起作用

postgresql - 如何手动将 String 映射到 postgresql 文本而不仅仅是 varchar(254)?

azure - 为什么 Spark num-executors 不等于 yarn 容器?

java - 在Java 1.8中的spark groupBy中按部门查找平均值

apache-spark - Kubernetes 上的 Spark 提交,即使 Spark 作业完成后,执行程序 Pod 仍在运行。因此,资源不能免费用于新工作

hadoop - 配置单元 : How to execute a query from a file and dump the output in hdfs

hadoop - 如何有效地将数据从 Kafka 移动到 Impala 表?