Airflow 插件未正确拾取

标签 airflow snowflake-cloud-data-platform

我们使用的是 Apache 1.9.0。我写了一个雪花钩子(Hook)插件。我已将钩子(Hook)放在 $AIRFLOW_HOME/plugins 目录中。

$AIRFLOW_HOME
  +--plugins
    +--snowflake_hook2.py

雪花钩子(Hook)2.py
# This is the base class for a plugin
from airflow.plugins_manager import AirflowPlugin

# This is necessary to expose the plugin in the Web interface
from flask import Blueprint
from flask_admin import BaseView, expose
from flask_admin.base import MenuLink

# This is the base hook for connecting to a database
from airflow.hooks.dbapi_hook import DbApiHook

# This is the snowflake provided Connector
import snowflake.connector

# This is the default python logging package
import logging

class SnowflakeHook2(DbApiHook):
    """
    Airflow Hook to communicate with Snowflake
    This is implemented as a Plugin
    """
    def __init__(self, connname_in='snowflake_default', db_in='default', wh_in='default', schema_in='default'):
        logging.info('# Connecting to {0}'.format(connname_in))
        self.conn_name_attr = 'snowflake_conn_id'
        self.connname = connname_in
        self.superconn = super().get_connection(self.connname) #gets the values from Airflow

        {SNIP - Connection stuff that works}
        self.cur = self.conn.cursor()

    def query(self,q,params=None):
        """From jmoney's db_wrapper allows return of a full list of rows(tuples)"""
        if params == None: #no Params, so no insertion
            self.cur.execute(q)
        else: #make the parameter substitution
            self.cur.execute(q,params)
        self.results = self.cur.fetchall()
        self.rowcount = self.cur.rowcount
        self.columnnames = [colspec[0] for colspec in self.cur.description]
        return self.results
    {SNIP - Other class functions}

class SnowflakePluginClass(AirflowPlugin):
    name = "SnowflakePluginModule"
    hooks = [SnowflakeHook2]
    operators = []

因此,我继续在 Airflows plugin_manager 中添加了一些打印语句,以尝试更好地处理正在发生的事情。重新启动网络服务器并运行 Airflow list_dags 后,这些行显示“新模块名称”(并且没有错误
SnowflakePluginModule [<class '__home__ubuntu__airflow__plugins_snowflake_hook2.SnowflakeHook2'>]
hook_module -  airflow.hooks.snowflakepluginmodule
INTEGRATING airflow.hooks.snowflakepluginmodule
snowflakepluginmodule <module 'airflow.hooks.snowflakepluginmodule'>

由于这与文档所说的一致,我应该可以在我的 DAG 中使用它:
from airflow import DAG
from airflow.hooks.snowflakepluginmodule import SnowflakeHook2 
from airflow.operators.python_operator import PythonOperator

但是网络抛出了这个错误
Broken DAG: [/home/ubuntu/airflow/dags/test_sf2.py] No module named 'airflow.hooks.snowflakepluginmodule'

所以问题是,我做错了什么?还是我发现了一个错误?

最佳答案

您需要导入如下:

from airflow import DAG
from airflow.hooks import SnowflakeHook2 
from airflow.operators.python_operator import PythonOperator

或者

from airflow import DAG
from airflow.hooks.SnowflakePluginModule import SnowflakeHook2 
from airflow.operators.python_operator import PythonOperator

关于 Airflow 插件未正确拾取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49883840/

相关文章:

airflow - on_failure_callback 在 Airflow DAG() 中不起作用

amazon-s3 - 如何处理 Snowflake 中 S3 文件路径中的空格

python - Lambda 错误 : no module found. Cryptography.hamtaz.bindings._constant_time

stored-procedures - 在雪花过程中动态切换角色

python - 调用 BashOperator 时出错 : Bash command failed

python - 如何在进行更改后重新运行过去的 DAG 运行

javascript - 雪花和存储过程我们如何循环保存在内部阶段中的文件行?

snowflake-cloud-data-platform - 使用 dbt 的物化 View 与表

python - Apache Airflow : Pass variable in jinja include

python - 如何在另一个 dag apache Airflow 中创建 dags