java - 驾驶员中可能出现的赛车状况

标签 java multithreading amazon-web-services apache-spark elastic-map-reduce

我有一个Spark作业,每次运行都会在S3上处理多个文件夹,并将其状态存储在DynamoDB上。换句话说,我们每天运行一次该作业,它将查找另一个作业添加的新文件夹,将它们一个接一个地转换并将状态写入DynamoDB。这是粗糙的伪代码:

object App {
  val allFolders = S3Folders.list()
  val foldersToProcess = DynamoDBState.getFoldersToProcess(allFolders)
  Transformer.run(foldersToProcess)
}

object Transformer {
  def run(folders: List[String]): Unit = {
    val sc = new SparkContext()
    folders.foreach(process(sc, _))
  }

  def process(sc: SparkContext, folder: String): Unit = ???  // transform and write to S3
}

如果S3Folders.list()返回的文件夹数量相对较少(最多几千个),这种方法效果很好,如果返回的文件夹数量更多(4-8K),我们经常会看到以下错误(乍一看与Spark无关):
17/10/31 08:38:20 ERROR ApplicationMaster: User class threw exception: shadeaws.SdkClientException: Failed to sanitize XML document destined for handler class shadeaws.services.s3.model.transform.XmlResponses
SaxParser$ListObjectsV2Handler
shadeaws.SdkClientException: Failed to sanitize XML document destined for handler class shadeaws.services.s3.model.transform.XmlResponsesSaxParser$ListObjectsV2Handler
        at shadeaws.services.s3.model.transform.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:214)
        at shadeaws.services.s3.model.transform.XmlResponsesSaxParser.parseListObjectsV2Response(XmlResponsesSaxParser.java:315)
        at shadeaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:88)
        at shadeaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:77)
        at shadeaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:62)
        at shadeaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:31)
        at shadeaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1553)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1271)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1055)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
        at shadeaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
        at shadeaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
        at shadeaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4247)
        at shadeaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4194)
        at shadeaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4188)
        at shadeaws.services.s3.AmazonS3Client.listObjectsV2(AmazonS3Client.java:865)
        at me.chuwy.transform.S3Folders$.com$chuwy$transform$S3Folders$$isGlacierified(S3Folders.scala:136)
        at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
        at scala.collection.TraversableLike$class.filterNot(TraversableLike.scala:267)
        at scala.collection.AbstractTraversable.filterNot(Traversable.scala:104)
        at me.chuwy.transform.S3Folders$.list(S3Folders.scala:112)
        at me.chuwy.transform.Main$.main(Main.scala:22)
        at me.chuwy.transform.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)
Caused by: shadeaws.AbortedException:
        at shadeaws.internal.SdkFilterInputStream.abortIfNeeded(SdkFilterInputStream.java:53)
        at shadeaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:81)
        at shadeaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
        at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
        at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
        at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
        at java.io.InputStreamReader.read(InputStreamReader.java:184)
        at java.io.BufferedReader.read1(BufferedReader.java:210)
        at java.io.BufferedReader.read(BufferedReader.java:286)
        at java.io.Reader.read(Reader.java:140)
        at shadeaws.services.s3.model.transform.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:186)
        ... 36 more

对于大量文件夹(约20K),这种情况一直发生,并且作业无法开始。

以前我们有非常相似的错误,但是当getFoldersToProcessGetItem中的每个文件夹执行allFolders时,错误更加频繁,因此花费的时间更长:
17/09/30 14:46:07 ERROR ApplicationMaster: User class threw exception: shadeaws.AbortedException:
shadeaws.AbortedException:
        at shadeaws.internal.SdkFilterInputStream.abortIfNeeded(SdkFilterInputStream.java:51)
        at shadeaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:71)
        at shadeaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
        at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:489)
        at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:126)
        at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:215)
        at com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1240)
        at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:802)
        at shadeaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:109)
        at shadeaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:43)
        at shadeaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1503)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1226)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
        at shadeaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
        at shadeaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
        at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.doInvoke(AmazonDynamoDBClient.java:2089)
        at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:2065)
        at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.executeGetItem(AmazonDynamoDBClient.java:1173)
        at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.getItem(AmazonDynamoDBClient.java:1149)
        at me.chuwy.tranform.sdk.Manifest$.contains(Manifest.scala:179)
        at me.chuwy.tranform.DynamoDBState$$anonfun$getUnprocessed$1.apply(ProcessManifest.scala:44)
        at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
        at scala.collection.TraversableLike$class.filterNot(TraversableLike.scala:267)
        at scala.collection.AbstractTraversable.filterNot(Traversable.scala:104)
        at me.chuwy.transform.DynamoDBState$.getFoldersToProcess(DynamoDBState.scala:44)
        at me.chuwy.transform.Main$.main(Main.scala:19)
        at me.chuwy.transform.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)

我认为当前错误与XML解析或无效响应无关,而是源自Spark内部的某些竞争条件,原因是:
  • “状态获取”所花费的时间与失败的机会之间有着明确的联系
  • 追溯具有基础的AbortedException,这是AFAIK由吞下的InterruptedException引起的,这意味着JVM内部的某些内容( Spark 提交或YARN)为主线程调用Thread.sleep

  • 现在,我使用的是EMR AMI 5.5.0,Spark 2.1.0和带阴影的AWS开发工具包1.11.208,但对于AWS开发工具包1.10.75却有类似的错误。

    我正在通过command-runner.jar spark-submit --deploy-mode cluster --class ...在EMR上部署此作业。

    有谁知道这个异常的来源以及如何解决?

    最佳答案

    foreach不能保证有序的计算,它会将操作应用于RDD的每个元素,这意味着它将为每个元素实例化,进而可能使执行器不堪重负。

    关于java - 驾驶员中可能出现的赛车状况,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47044084/

    相关文章:

    java - SimpleDateFormat 警告 要获取本地格式,请使用 getDateInstance()、getDateTimeInstance() 或 getTimeInstance(),

    c# - C#中自制线程池的多线程死锁

    java - 为什么在初始化 Timer 实例时会收到 java.security.AccessControlException?

    amazon-web-services - 如何通过aws-cli将dynamodb表导出为csv(不使用管道)

    java - Simpledateformat 无法识别有效字符串

    java - 用于 SpringMVC 应用程序的 RESTful API

    java - GUI编程-定制框架

    python多线程加入超时

    amazon-ec2 - 为什么我无法从 AWS 管理控制台启动 Java SSH 客户端?

    mysql - 在 Istio 中连接到 MySQL (AWS RDS)