airflow - 在不启动新集群的情况下从 Airflow 触发 Databricks 作业

标签 airflow databricks

我正在使用 Airflow 来触发数据 block 上的作业。我有许多运行数据 block 作业的 DAG,我希望只使用一个集群而不是多个集群,因为据我了解,这将降低这些任务产生的成本。

使用 DatabricksSubmitRunOperator有两种方法可以在数据 block 上运行作业。使用正在运行的集群通过 id 调用它

'existing_cluster_id' : '1234-567890-word123',

或启动一个新集群
'new_cluster': {
    'spark_version': '2.1.0-db3-scala2.11',
    'num_workers': 2
  },

现在我想尽量避免为每个任务启动一个新的集群,但是集群在停机期间关闭,因此它不再通过它的 id 可用,我会收到一个错误,所以我认为唯一的选择是新集群。

1) 有没有办法让集群即使在关闭时也可以通过 id 调用?

2)人们只是让集群保持活力吗?

3)或者我完全错了,为每个任务启动集群不会产生更多成本?

4)有什么我完全错过的吗?

最佳答案

基于@YannickSSE 的评论回复的更新
我不使用数据 block ;您是否可以通过与您可能期望或可能不期望正在运行的集群相同的 id 启动一个新集群,并且在它正在运行的情况下让它成为空操作?也许不是,或者你可能不会问这个。响应:不,当启动一个新集群时你不能给出一个 id。

您能否编写一个 python 或 bash 运算符来测试集群是否存在? (响应:这将是一个测试作业提交......不是最好的方法。)如果它找到它并成功,下游任务将使用现有集群 ID 触发你的作业,但如果它没有,另一个下游任务可以使用 trigger_rule all_failed执行相同的任务,但使用新集群。那么这两个任务 DatabricksSubmitRunOperator s 可以有一个下游任务 trigger_rule one_success 。 (响应:或使用分支运算符来确定执行的运算符。)

这可能并不理想,因为我想您的集群 ID 会不时更改,导致您必须跟上。 ......集群是该运算符(operator)的databricks钩子(Hook)连接的一部分,并且可以更新吗?也许您想在需要它的任务中将其指定为 {{ var.value.<identifying>_cluster_id }} 并将其更新为 Airflow 变量。 (响应:集群 id 不在 Hook 中,因此变量或 DAG 文件在发生更改时必须更新。)

关于airflow - 在不启动新集群的情况下从 Airflow 触发 Databricks 作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54561640/

相关文章:

Airflow 分支错误,类型错误 : 'NoneType' object is not iterable

Python 操作符中的 Airflow 宏

python - 是否可以构建扩展 Airflow DAG 任务的树形结构? (动态任务映射输出上的动态任务映射)

apache-spark - 如何在 Databricks(社区版)上本地保存 Great_Expectations 套件

python - Airflow 从与执行文件相同的指定路径中的文件导入时出现问题

python - 将返回值从运算符传递给 Airflow 中的后续运算符

使用 R 从 Microsoft Azure 读取 csv 文件

azure pyspark从jar注册udf失败UDFRegistration

azure - 是否可以从databricks访问Azure表服务

azure - Spark 驱动程序在 900 秒内启动失败