我正在使用 Airflow 来触发数据 block 上的作业。我有许多运行数据 block 作业的 DAG,我希望只使用一个集群而不是多个集群,因为据我了解,这将降低这些任务产生的成本。
使用 DatabricksSubmitRunOperator
有两种方法可以在数据 block 上运行作业。使用正在运行的集群通过 id 调用它
'existing_cluster_id' : '1234-567890-word123',
或启动一个新集群
'new_cluster': {
'spark_version': '2.1.0-db3-scala2.11',
'num_workers': 2
},
现在我想尽量避免为每个任务启动一个新的集群,但是集群在停机期间关闭,因此它不再通过它的 id 可用,我会收到一个错误,所以我认为唯一的选择是新集群。
1) 有没有办法让集群即使在关闭时也可以通过 id 调用?
2)人们只是让集群保持活力吗?
3)或者我完全错了,为每个任务启动集群不会产生更多成本?
4)有什么我完全错过的吗?
最佳答案
基于@YannickSSE 的评论回复的更新
我不使用数据 block ;您是否可以通过与您可能期望或可能不期望正在运行的集群相同的 id 启动一个新集群,并且在它正在运行的情况下让它成为空操作?也许不是,或者你可能不会问这个。响应:不,当启动一个新集群时你不能给出一个 id。
您能否编写一个 python 或 bash 运算符来测试集群是否存在? (响应:这将是一个测试作业提交......不是最好的方法。)如果它找到它并成功,下游任务将使用现有集群 ID 触发你的作业,但如果它没有,另一个下游任务可以使用 trigger_rule
all_failed
执行相同的任务,但使用新集群。那么这两个任务 DatabricksSubmitRunOperator
s 可以有一个下游任务 trigger_rule
one_success
。 (响应:或使用分支运算符来确定执行的运算符。)
这可能并不理想,因为我想您的集群 ID 会不时更改,导致您必须跟上。 ......集群是该运算符(operator)的databricks钩子(Hook)连接的一部分,并且可以更新吗?也许您想在需要它的任务中将其指定为 {{ var.value.<identifying>_cluster_id }}
并将其更新为 Airflow 变量。 (响应:集群 id 不在 Hook 中,因此变量或 DAG 文件在发生更改时必须更新。)
关于airflow - 在不启动新集群的情况下从 Airflow 触发 Databricks 作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54561640/