amazon-web-services - 无法在 AWS Glue PySpark Dev Endpoint 中正确运行脚本

标签 amazon-web-services pyspark aws-glue

我已经配置了一个 AWS Glue 开发端点,并且可以在 pyspark REPL shell 中成功连接到它 - 像这样 https://docs.aws.amazon.com/glue/latest/dg/dev-endpoint-tutorial-repl.html

与 AWS 文档中给出的示例不同,我在开始 session 时收到警告,后来对 AWS Glue DynamicFrame 结构的各种操作失败。这是启动 session 的完整日志 - 请注意有关 spark.yarn.jars 和 PyGlue.zip 的错误:

Python 2.7.12 (default, Sep  1 2016, 22:14:00)
[GCC 4.8.3 20140911 (Red Hat 4.8.3-9)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/share/aws/glue/etl/jars/glue-assembly.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
18/03/02 14:18:58 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
18/03/02 14:19:03 WARN Client: Same path resource file:/usr/share/aws/glue/etl/python/PyGlue.zip added multiple times to distributed cache.
18/03/02 14:19:13 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Python version 2.7.12 (default, Sep  1 2016 22:14:00)
SparkSession available as 'spark'.
>>>

许多操作按我的预期工作,但我也收到了一些不受欢迎的异常,例如,我可以从我的 Glue 目录加载数据,检查其结构和其中的数据,但我无法对其应用 Map,或将其转换为 DF。这是我的完整执行运行日志(除了最长的错误消息)。前几个命令和设置都运行良好,但最后两个操作失败:
>>> import sys
>>> from awsglue.transforms import *
>>> from awsglue.utils import getResolvedOptions
>>> from pyspark.context import SparkContext
>>> from awsglue.context import GlueContext
>>> from awsglue.job import Job
>>>
>>> glueContext = GlueContext(spark)
>>> # Receives a string of the format yyyy-mm-dd hh:mi:ss.nnn and returns the first 10 characters: yyyy-mm-dd
... def TruncateTimestampString(ts):
...   ts = ts[:10]
...   return ts
...
>>> TruncateTimestampString('2017-03-05 06:12:08.376')
'2017-03-05'
>>>
>>> # Given a record with a timestamp property returns a record with a new property, day, containing just the date portion of the timestamp string, expected to be yyyy-mm-dd.
... def TruncateTimestamp(rec):
...   rec[day] = TruncateTimestampString(rec[timestamp])
...   return rec
...
>>> # Get the history datasource - WORKS WELL BUT LOGS log4j2 ERROR
>>> datasource_history_1 = glueContext.create_dynamic_frame.from_catalog(database = "dev", table_name = "history", transformation_ctx = "datasource_history_1")
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
>>> # Tidy the history datasource - WORKS WELL
>>> history_tidied = datasource_history_1.drop_fields(['etag', 'jobmaxid', 'jobminid', 'filename']).rename_field('id', 'history_id')
>>> history_tidied.printSchema()
root
|-- jobid: string
|-- spiderid: long
|-- timestamp: string
|-- history_id: long

>>> # Trivial observation of the SparkSession objects
>>> SparkSession
<class 'pyspark.sql.session.SparkSession'>
>>> spark
<pyspark.sql.session.SparkSession object at 0x7f8668f3b650>
>>> 
>>> 
>>> # Apply a mapping to the tidied history datasource. FAILS
>>> history_mapped = history_tidied.map(TruncateTimestamp)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/mnt/tmp/spark-1f0341db-5de6-4008-974f-a1d194524a86/userFiles-6a67bdee-7c44-46d6-a0dc-9daa7177e7e2/PyGlue.zip/awsglue/dynamicframe.py", line 101, in map
File "/mnt/tmp/spark-1f0341db-5de6-4008-974f-a1d194524a86/userFiles-6a67bdee-7c44-46d6-a0dc-9daa7177e7e2/PyGlue.zip/awsglue/dynamicframe.py", line 105, in mapPartitionsWithIndex
File "/usr/lib/spark/python/pyspark/rdd.py", line 2419, in __init__
    self._jrdd_deserializer = self.ctx.serializer
AttributeError: 'SparkSession' object has no attribute 'serializer'
>>> history_tidied.toDF()
ERROR
Huge error log and stack trace follows, longer than my console can remember. Here's how it finishes:
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/mnt/tmp/spark-1f0341db-5de6-4008-974f-a1d194524a86/userFiles-6a67bdee-7c44-46d6-a0dc-9daa7177e7e2/PyGlue.zip/awsglue/dynamicframe.py", line 128, in toDF
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 79, in deco
    raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.IllegalArgumentException: u"Error while instantiating 'org.apache.spark.sql.hive.HiveSessionState':"

我想我正在遵循亚马逊在他们的 Dev Endpoint REPL 说明中给出的说明,但是由于这些相当简单的操作(DynamicFrame.join 和 DynamicFrame.toDF)失败了,当我想真正运行该作业时,我正在黑暗中工作(这似乎成功了,但我的 DynamicFrame.printSchema() 和 DynamicFrame.show() 命令没有显示在执行的 CloudWatch 日志中)。

有谁知道我需要做什么来修复我的 REPL 环境,以便我可以正确测试 pyspark AWS Glue 脚本?

最佳答案

AWS Support 终于回应了我对这个问题的询问。这是回应:

On researching further, I found that this is a known issue with the PySpark shell and glue service team is already on it. The fix should be deployed soon, however currently there's no ETA that I can share with you.

Meanwhile here's a workaround: before initializing Glue context, you can do

>> newconf = sc._conf.set("spark.sql.catalogImplementation", "in-memory")
>> sc.stop()
>> sc = sc.getOrCreate(newconf)

and then instantiate glueContext from that sc.



我可以确认这对我有用,这是我能够运行的脚本:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

# New recommendation from AWS Support 2018-03-22
newconf = sc._conf.set("spark.sql.catalogImplementation", "in-memory")
sc.stop()
sc = sc.getOrCreate(newconf)
# End AWS Support Workaround

glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

datasource_history_1 = glueContext.create_dynamic_frame.from_catalog(database = "dev", table_name = "history", transformation_ctx = "datasource_history_1")

def DoNothingMap(rec):
  return rec

history_mapped = datasource_history_1.map(DoNothingMap)
history_df = history_mapped.toDF()
history_df.show()
history_df.printSchema()

以前的.map().toDF()调用会失败。

我已要求 AWS Support 在此问题解决后通知我,以便不再需要解决方法。

关于amazon-web-services - 无法在 AWS Glue PySpark Dev Endpoint 中正确运行脚本,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49071424/

相关文章:

amazon-web-services - 无法使用 Kops 验证 Kubernetes 集群

amazon-web-services - AWS 动态 list 不会提取重复的主机名

python - 获取 Python shell 作业中的当前作业角色 - Aws Glue

amazon-web-services - 爬网程序可以更新 AWS Glue 中的导入表吗?

pyspark - 手动创建pyspark数据框

amazon-web-services - 使用glueContext.write_dynamic_frame.from_options 的AWS Glue 导出到 Parquet 问题

amazon-web-services - 使用 AWS CloudFront 在重启后幸存下来

database - 连接到 AWS 中的私有(private) RDS

apache-spark - PySpark:标记点 RDD 的许多功能

apache-spark - 为 pyspark 启动的 jvm 指定选项