python - 将模板变量传递给 HiveOperator

标签 python airflow

我有一个 jinja 模板,计划将其用于 Hive 中的动态 SQL 生成。我的模板如下所示:

USE {{ db }};

CREATE EXTERNAL TABLE IF NOT EXISTS foo (
    A int,
    B int
)
stored as parquet
location ‘….’;

“db”是可以通过函数调用派生的东西。我决定编写一个扩展 HiveExecOperator 的运算符。在我的环境中,类层次结构是:

BaseOperator <—— BaseExecOperator <—— HiveExecOperator

我的 TestHive 运算符如下所示:

class TestHive(HiveExecOperator):
    def pre_execute(self, context):
        context[‘db’] = func1(…,,)
        return context['ti'].render_templates()

这个不起作用,因为模板内的 {{ db }} 没有得到任何东西,并且 hive 语句失败。我还尝试在 TestHive 中覆盖 render_template,如下所示:

class TestHive(HiveExecOperator):
    def render_template(self, attr, content, context):
    context['db'] = func1(..,)
    return super(TestHive, self).render_templates(attr, content, context)

此方法失败,因为 TestHive 的父类没有 render_templates 方法。

方法:render_templates”仅在BaseOperator中定义。

感谢任何帮助。

最佳答案

假设您指的是 HiveOperator 而不是 HiveExecOperator,并查看您所描述的内容,我认为您不需要在这里派生任何类型的运算符。除非有一些我没有看到的额外丢失信息,否则您只是询问如何将函数调用的值作为参数传递到模板化命令中。

HiveOperatorhql 参数是 template field 。这意味着您应该能够像已经完成的那样简单地定义模板,然后将值作为 Operator 调用的一部分提供给它。但请记住在传入的变量前加上 params 前缀。请参阅:

my_query= """
    USE {{ params.db }};

    CREATE EXTERNAL TABLE IF NOT EXISTS foo (
    A int,
    B int
    )
    stored as parquet
    location .......
    """

run_hive_query = HiveOperator(
    task_id="my_task",
    hql=my_query,
    params={ 'db': func1(...) },
    dag=dag
)

关于python - 将模板变量传递给 HiveOperator,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52709621/

相关文章:

python - 将唯一对象列表与自定义函数进行比较

python - 当我尝试转换 Keras MLP 时,为什么 Google Colab 会给出 "unknown device"错误?

python - 命令 "python setup.py egg_info"失败,错误代码为 1

python - Web服务在Airflow Docker容器中不起作用

amazon-ec2 - 为什么 Apache airflow 使用命令 : 'airflow initdb' ? 失败

Airflow 身份验证设置失败, "AttributeError: can' t set 属性”

python - 如果函数花费的时间太长,则跳过循环?

python - 已知结构矩阵的 NumPy 矩阵乘法效率

python - 如何正确构建自引用 SQLalchemy 邻接列表混合

linux - 组用户名将 root 显示为组但不允许删除