hadoop - S3NativeFileSystem 调用是否会终止我在 AWS EMR 4.6.0 上的 Pyspark 应用程序

标签 hadoop amazon-web-services amazon-s3 pyspark emr

当我的 Spark 应用程序必须从 S3 访问大量 CSV 文件(每个大约 1000 个 @ 63MB)并将它们通过管道传输到 Spark RDD 时,它会失败。分割 CSV 的实际过程似乎有效,但对 S3NativeFileSystem 的额外函数调用似乎导致错误并且作业崩溃。

首先,以下是我的 PySpark 应用程序:

from pyspark import SparkContext
sc = SparkContext("local", "Simple App")
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
import time

startTime = float(time.time())

dataPath = 's3://PATHTODIRECTORY/'
sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "MYKEY")
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", "MYSECRETKEY")

def buildSchemaDF(tableName, columnList):
    currentRDD = sc.textFile(dataPath + tableName).map(lambda line: line.split("|"))
    currentDF = currentRDD.toDF(columnList)
    return currentDF

loadStartTime = float(time.time())
lineitemDF = buildSchemaDF('lineitem*', ['l_orderkey','l_partkey','l_suppkey','l_linenumber','l_quantity','l_extendedprice','l_discount','l_tax','l_returnflag','l_linestatus','l_shipdate','l_commitdate','l_receiptdate','l_shipinstruct','l_shipmode','l_comment'])
lineitemDF.registerTempTable("lineitem")
loadTimeElapsed = float(time.time()) - loadStartTime

queryStartTime = float(time.time())

qstr = """
    SELECT
        lineitem.l_returnflag,
        lineitem.l_linestatus,
        sum(l_quantity) as sum_qty,
        sum(l_extendedprice) as sum_base_price,
        sum(l_discount) as sum_disc,
        sum(l_tax) as sum_tax,
        avg(l_quantity) as avg_qty,
        avg(l_extendedprice) as avg_price,
        avg(l_discount) as avg_disc,
        count(l_orderkey) as count_order
    FROM
        lineitem
    WHERE
        l_shipdate <= '19981001'
    GROUP BY
        l_returnflag,
        l_linestatus
    ORDER BY
        l_returnflag,
        l_linestatus
    """
tpch1DF = sqlContext.sql(qstr)

queryTimeElapsed = float(time.time()) - queryStartTime
totalTimeElapsed = float(time.time()) - startTime

tpch1DF.show()

queryResults = [qstr, loadTimeElapsed, queryTimeElapsed, totalTimeElapsed]
distData = sc.parallelize(queryResults)
distData.saveAsTextFile(dataPath + 'queryResults.csv')

print 'Load Time: ' + str(loadTimeElapsed)
print 'Query Time: ' + str(queryTimeElapsed)
print 'Total Time: ' + str(totalTimeElapsed)

为了一步一步进行,我首先使用以下 AWS CLI 命令启动 Spark EMR 集群(添加回车符以提高可读性):

aws emr create-cluster --name "Big TPCH Spark cluster2" --release-label emr-4.6.0 
--applications Name=Spark --ec2-attributes KeyName=blazing-test-aws 
--log-uri s3://aws-logs-132950491118-us-west-2/elasticmapreduce/j-1WZ39GFS3IX49/ 
--instance-type m3.2xlarge --instance-count 6 --use-default-roles

EMR 集群完成配置后,我将 Pyspark 应用程序复制到“/home/hadoop/pysparkApp.py”的主节点上。复制后,我可以添加 Spark-submit 步骤。

aws emr add-steps --cluster-id j-1DQJ8BDL1394N --steps
Type=spark,Name=SparkTPCHTests,Args=[--deploy-mode,cluster,-
conf,spark.yarn.submit.waitAppCompletion=true,--num-executors,5,--executor
cores,5,--executor memory,20g,/home/hadoop/tpchSpark.py]
,ActionOnFailure=CONTINUE

现在,如果我仅对上述几个 CSV 文件运行此步骤,将生成最终结果,但脚本仍会声称失败。

我认为这与对 S3NativeFileSystem 的额外调用有关,但我不确定。这些是我收到的 Yarn 日志消息,它们使我得出了这个结论。第一个调用似乎工作得很好:

16/05/15 23:18:00 INFO HadoopRDD: Input split: s3://data-set-builder/splitLineItem2/lineitemad:0+64901757
16/05/15 23:18:00 INFO latency: StatusCode=[200], ServiceName=[Amazon S3], AWSRequestID=[ED8011CE4E1F6F18], ServiceEndpoint=[https://data-set-builder.s3-us-west-2.amazonaws.com], HttpClientPoolLeasedCount=0, RetryCapacityConsumed=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=2, ClientExecuteTime=[77.956], HttpRequestTime=[77.183], HttpClientReceiveResponseTime=[20.028], RequestSigningTime=[0.229], CredentialsRequestTime=[0.003], ResponseProcessingTime=[0.128], HttpClientSendRequestTime=[0.35],

虽然第二个似乎没有正确执行,导致“部分结果”(206错误):

16/05/15 23:18:00 INFO S3NativeFileSystem: Opening 's3://data-set-builder/splitLineItem2/lineitemad' for reading
16/05/15 23:18:00 INFO latency: StatusCode=[206], ServiceName=[Amazon S3], AWSRequestID=[10BDDE61AE13AFBE], ServiceEndpoint=[https://data-set-builder.s3.amazonaws.com], HttpClientPoolLeasedCount=0, RetryCapacityConsumed=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=2, Client Execute Time=[296.86], HttpRequestTime=[295.801], HttpClientReceiveResponseTime=[293.667], RequestSigningTime=[0.204], CredentialsRequestTime=[0.002], ResponseProcessingTime=[0.34], HttpClientSendRequestTime=[0.337],
16/05/15 23:18:02 INFO ApplicationMaster: Waiting for spark context initialization ...

我不明白为什么当第一个调用似乎已有效响应甚至分割文件时,它甚至对 S3NativeFileSystem 进行第二次调用。这是我的 EMR 配置的产物吗?我知道 S3Native 有文件限制问题,并且直接 S3 调用是最佳选择,这就是我尝试做的,但无论我做什么,这个调用似乎都存在。请帮忙!

此外,在我的 Yarn 日志中添加一些其他相关的错误消息。

1)

16/05/15 23:19:22 ERROR ApplicationMaster: SparkContext did not initialize after waiting for 100000 ms. Please check earlier log output for errors. Failing the application.
16/05/15 23:19:22 INFO ApplicationMaster: Final app status: FAILED, exitCode: 13, (reason: Timed out waiting for SparkContext.)

2)

16/05/15 23:19:22 ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /mnt/yarn/usercache/hadoop/appcache/application_1463354019776_0001/blockmgr-f847744b-c87a-442c-9135-57cae3d1f6f0/2b/temp_shuffle_3fe2e09e-f8e4-4e5d-ac96-1538bdc3b401
java.io.FileNotFoundException: /mnt/yarn/usercache/hadoop/appcache/application_1463354019776_0001/blockmgr-f847744b-c87a-442c-9135-57cae3d1f6f0/2b/temp_shuffle_3fe2e09e-f8e4-4e5d-ac96-1538bdc3b401 (No such file or directory)
        at java.io.FileOutputStream.open(Native Method)
        at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
        at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:162)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:226)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
16/05/15 23:19:22 ERROR BypassMergeSortShuffleWriter: Error while deleting file /mnt/yarn/usercache/hadoop/appcache/application_1463354019776_0001/blockmgr-f847744b-c87a-442c-9135-57cae3d1f6f0/2b/temp_shuffle_3fe2e09e-f8e4-4e5d-ac96-1538bdc3b401
16/05/15 23:19:22 WARN TaskMemoryManager: leak 32.3 MB memory from org.apache.spark.unsafe.map.BytesToBytesMap@762be8fe
16/05/15 23:19:22 ERROR Executor: Managed memory leak detected; size = 33816576 bytes, TID = 14
16/05/15 23:19:22 ERROR Executor: Exception in task 13.0 in stage 1.0 (TID 14)
java.io.FileNotFoundException: /mnt/yarn/usercache/hadoop/appcache/application_1463354019776_0001/blockmgr-f847744b-c87a-442c-9135-57cae3d1f6f0/3a/temp_shuffle_b9001fca-bba9-400d-9bc4-c23c002e0aa9 (No such file or directory)
        at java.io.FileOutputStream.open(Native Method)
        at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
        at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:88)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

最佳答案

Spark 配置的优先顺序是:

SparkContext(代码/应用程序)> Spark-submit > Spark-defaults.conf

这里有几点需要指出 -

  1. 在 Spark 提交命令中使用 YARN 集群作为部署模式和主节点 -

    spark-submit --deploy-mode cluster --master yarn ...

    或者

    spark-submit --master yarn-cluster ...

  2. 从行 sc = SparkContext("local", "Simple App") 中删除“本地”字符串在你的代码中。使用conf = SparkConf().setAppName(appName) sc = SparkContext(conf=conf)初始化 Spark 上下文。

引用 - http://spark.apache.org/docs/latest/programming-guide.html

关于hadoop - S3NativeFileSystem 调用是否会终止我在 AWS EMR 4.6.0 上的 Pyspark 应用程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37259403/

相关文章:

ruby-on-rails - 为什么 Ruby open-uri open 在我的单元测试中返回一个 StringIO,而在我的 Controller 中返回一个 FileIO?

hadoop - 将文件复制到HDFS时出错

amazon-web-services - `eb deploy` 错误输出 "Default subnet not found in us-west-2d"

node.js - S3 使用 NodeJS 强制文件下载

node.js - 出现错误 : Stream yields empty buffer when resizing images from S3 with local node service

javascript - angular handle @sign in view path 如何处理?

azure - 到 HDInsight Spark 的 Rest 接口(interface)以提交作业并读取结果

java - hadoop wordcount 与 java

Hadoop Terasort 基准测试结果不稳定

ios - iOS : A server with the specified hostname could not be found 上的 AWS 转录错误