hadoop - 在运算符之外使用 Airflow 宏

标签 hadoop macros airflow

有没有办法在任何运算符之外使用 Airflow 宏?

例如,在 DAG 中我有一个 Action :

datestamp = '{{ ds }}'

print(datestamp) # prints string not the date when I run it for any date

scanner = S3KeySensor(
        task_id='scanner',
        poke_interval=60,
        timeout=24 * 60 * 60,
        soft_fail=False,
        wildcard_match=True,
        bucket_key=getPath() + datestamp, #datestamp correctly replaced with execution date
        bucket_name=bucketName,
        dag=dag)

因此,当调用扫描器时,“ds”值被替换为预期的执行日期,但我想在其他一些地方使用“ds”值。但在那种情况下,它不会替换值,而是将整个字符串获取为“{{ ds }}”。在上面的例子中。 print 语句打印“{{ ds }}”而不是执行日期。

最佳答案

幸运的是,bucket_key 是模板化的,只需将 jinja 模板放入其中即可。

…
bucket_key=getPath() + '{{ ds }}',
…

完全在运算符之外,您不能使用这些宏。因为该文件由调度程序定期解释,而不仅仅是在 dag 运行期间。那么当 dag 未运行时 ds 的值是多少?

但是,由于您不太可能想在任务之外对其进行任何操作,因此可以将其放入模板化字段中。您还可以扩展另一个要模板化的字段。

class MySensor(S3KeySensor):
    template_fields = ('bucket_key', 'bucket_name', 'my_thing')

    def __init__(self, my_thing=None, *args, **kwargs):
        super(MySensor, self).__init__(*args, **kwargs)
        self.my_thing = my_thing

    def post_execute(self, context):
        logging.info(
           "I probably wanted to over-ride poke to use {}".format(self.my_thing)

scanner = MySensor(
    my_thing='{{ ds }}',
    task_id='scanner',
    poke_interval=60,
    timeout=24 * 60 * 60,
    soft_fail=False,
    wildcard_match=True,
    bucket_key=getPath() + '{{ ds }}',
    bucket_name=bucketName,
    dag=dag)

编辑:IIRC self.my_thing 在初始化后不会改变,相反,context.my_thing 将在 (?pre_execute 和) execute 被调用。

关于hadoop - 在运算符之外使用 Airflow 宏,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45064887/

相关文章:

database - 存储和解析文本日志和报告的理想系统

c++ - 我怎样才能只写一次 "#include"?

c++ - 宏不执行功能

使用 Airflow 进行 Azure AD 身份验证

apache-spark - Spark on Yarn 发送 RPC 失败且 Slave 丢失

linux - bashrc 文件意外结束错误/hadoop

java - Hadoop-MapReduce 的小型数据集

Windows 宏 + 热键 - 无格式粘贴

pyspark - 如何在 Databricks 笔记本中使用 Airflow 变量?

python - 如何使用 Cloud composer 将大数据从 Postgres 导出到 S3?