python - 执行运算符后 Airflow 得到结果

标签 python hive airflow

我配置了 airflow 并创建了一些 Dag 和 subDag 来调用多个操作符。

我的麻烦是,当运算符(operator)运行并完成作业时,我想以某种 Python 结构接收返回的结果。 例如:

File1.py

  ...
    ...
    sub_dag_one=SubDagOperator(subdag=subdag_accessHive(
PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, STEP, macros,path,
       ),
        task_id=DELP_DAG_NAME,
        dag=dag,
    )

File2.py

  from airflow import DAG
    from airflow.operators import HiveOperator
def subdag_callHive(parent, child, args, step,
                         user_defined_macros, path
                        ):
        dag_subdag = DAG(
            dag_id='%s.%s' % (parent, child),
            default_args=args,
            schedule_interval="@daily",
            template_searchpath=path,
            user_defined_macros=user_defined_macros,
        )

        # some work...

        HiveOperator(
            task_id='some_id',
            hiveconf_jinja_translate=True,
            hql='select field1 from public.mytable limit 4;',
            trigger_rule='all_done',
            dag=dag_subdag,
        )

        return dag_subdag 

函数 subdag_callHive 是从另一个 python 脚本调用的,其中定义了主 Dag 和所有其他所需的参数。

我只需要能够从 HiveOperator 获得结果 (*select * from public.mytable limit 4;*) 在本例中为 4 个值。

返回的 dag_subdag 是一个对象 并且包含传递给调用的所有属性/数据,但没有关于 HiveOperator 做了什么的信息。

这可能吗?如果是的话,如何实现。

最佳答案

您可以根据需要使用 Hooks。 HiveOperator 基本上做同样的事情,他调用 Hive Hooks,它有多种方法来处理结果。

使用 PythonOperator 调用一个函数,然后启动一个配置单元 Hook 。

以下示例可能对您有所帮助。

代码片段:

callHook = PythonOperator(
    task_id='foo',
    python_callable=do_work,
    dag=dag
)

def do_work():
    hiveserver = HiveServer2Hook()
    hql = "SELECT COUNT(*) FROM foo.bar"
    row_count = hiveserver.get_records(hql, schema='foo')
    print row_count[0][0]

所有可用的方法都可以在这里找到:https://github.com/apache/incubator-airflow/blob/master/airflow/hooks/hive_hooks.py

关于python - 执行运算符后 Airflow 得到结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38039045/

相关文章:

python - 从 namedtuple 属性创建新列表

python - 哪种形式的 unicode 规范化适合文本挖掘?

shell - 在不退出 hive shell 的情况下终止 hive 查询

hive0.10.0 线程中的异常 "main"java.lang.NoSuchMethodError : org. apache.thrift.EncodingUtils.setBit(BIZ)B

python - Airflow Unittest.cfg 权限问题?

python - 从文件重定向 stdin 时如何读取包含 ctrl-Z 的文件?

python - Bottle.py服务器部署

hive - Apache Impala 中是否有相当于 Hive 的 'explode' 函数的函数?

docker - 如何将 Airflow 主页从 docker 更改为本地系统

python - 属性错误 : 'MSVCCompiler' object has no attribute 'linker_exe'