我有 Airflow 作业,它们在 EMR 集群上运行良好。我需要的是,假设我有 4 个 Airflow 作业需要 EMR 集群,假设 20 分钟才能完成任务。为什么我们不能在 DAG 运行时创建一个 EMR 集群,一旦作业完成,它就会终止创建的 EMR 集群。
最佳答案
当然,那将是对资源最有效的利用。让我警告你:这里面有很多细节;我会尽力列出尽可能多的内容。我鼓励您添加自己的综合答案,列出您遇到的任何问题和解决方法(一旦您解决了这个问题)
关于集群创建/终止
对于集群的创建和终止,您有
EmrCreateJobFlowOperator
和EmrTerminateJobFlowOperator
分别不用担心
AWS
SecretAccessKey
(并完全依赖IAM
Roles );在Airflow
中实例化任何AWS
相关的hook
或operator
将 automatically fall-back to underlyingEC2
's attachedIAM
Role如果您没有使用 EMR-Steps API 进行作业提交,那么您还必须手动感知上述两种操作,使用
Sensors
.已经有一个用于轮询创建阶段的传感器,称为EmrJobFlowSensor
你也可以稍微修改它来创建一个终止传感器您在
job_flow_extra
中传递您的集群配置 JSON .您还可以在Connection
中传递配置的(如my_emr_conn
)extra
param ,但不要使用它,因为它经常破坏SQLAlchemy
ORM 加载(因为它是一个很大的json
)
关于作业提交
您可以使用 EMR-Steps API 将作业提交到
Emr
,这可以在集群创建阶段(在 Cluster-Configs JSON 中)或之后使用add_job_flow_steps()
完成.甚至还有一个emr_add_steps_operator()
在Airflow
中也需要EmrStepSensor
.您可以在AWS
docs 中阅读更多相关信息你可能还必须使用command-runner.jar
对于特定于应用程序的情况(如
Hive
、Livy
),您可以使用它们的特定方式。例如,您可以使用HiveServer2Hook
提交一个Hive
作业。这是一个棘手的部分:run_job_flow()
call (在集群创建阶段制作)只给你一个job_flow_id
(cluster-id)。你必须使用describe_cluster()
call使用EmrHook
获取主节点的private-IP。使用它,您将能够 programmatically create aConnection
(例如Hive Server 2 Thrift
connection )并使用它来将您的计算提交到集群。在完成您的工作流程之前,不要忘记删除这些连接(为了优雅)。最后是用于与集群交互的老式 bash。为此,您还应该传递
EC2
key pair during cluster creation phase。 .之后,您可以以编程方式创建一个SSH
connection并使用它(使用SSHHook
或SSHOperator
)在集群上运行作业。在Airflow
中阅读更多关于 SSH 的内容 here特别是提交
Spark
作业到远程Emr
集群,阅读this discussion
关于apache-spark - 使用 Airflow dag run 创建 EMR 集群,任务完成后 EMR 将终止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55227683/