python - Airflow SSH 运算符(operator)如何指定身份验证类型

标签 python ssh kerberos airflow paramiko

我正在尝试使用 SSHOperator 从 Airflow Worker ssh 登录到我的一台服务器。我的 SSH 被配置为使用 Kerberos 身份验证类型。使用以下额外参数配置的默认 SSH 连接时出现以下错误。

SSH operator error: Bad authentication type; allowed types: ['publickey,'gssapi-keyex',gssapi-with-mic,'keyboard-interactive']

如何解决这个错误。我在 Airflow 用户界面的连接的附加字段中尝试了以下设置。

{ "gss-auth":"true","gssapi-keyex":"true","gss-kex":"true"}

Airflow SSHoperator 中是否有一个选项可以指定要使用的身份验证类型是 Kerberos?

最佳答案

即使底层 Paramiko 库具有该支持,Airflow 的现有 SSHOperator 中也没有 Kerberos 身份验证支持。

我能够通过编写扩展 SSHHook 的自定义 Hook 来解决此问题,该 Hook 将参数传递给底层 Paramiko 库以指定 Kerberos 作为身份验证类型。有效!感谢 Airflow 的易于扩展性。

自定义 Hook :

class CustomSSHHook(SSHHook):
    """
    Custom SSH Hook with kerberose authentication support
    """

    def __init__(self,
                 ssh_conn_id=None,
                 remote_host=None,
                 username=None,
                 password=None,
                 key_file=None,
                 port=None,
                 timeout=10,
                 keepalive_interval=30):
        super(CustomSSHHook, self).__init__(
                 ssh_conn_id,
                 remote_host,
                 username,
                 password,
                 key_file,
                 port,
                 timeout,
                 keepalive_interval)
        # Use connection to override defaults
        self.gss_auth = False
        if self.ssh_conn_id is not None:
            conn = self.get_connection(self.ssh_conn_id)

            if conn.extra is not None:
                extra_options = conn.extra_dejson
                if "gss_auth" in extra_options \
                        and str(extra_options["gss_auth"]).lower() == 'true':
                    self.gss_auth = True

    def get_conn(self):
        """
                Opens a ssh connection to the remote host.

                :return paramiko.SSHClient object
                """

        self.log.debug('Creating SSH client for conn_id: %s', self.ssh_conn_id)
        client = paramiko.SSHClient()
        client.load_system_host_keys()
        if self.no_host_key_check:
            # Default is RejectPolicy
            client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

        if self.password and self.password.strip():
            client.connect(hostname=self.remote_host,
                           username=self.username,
                           password=self.password,
                           key_filename=self.key_file,
                           timeout=self.timeout,
                           compress=self.compress,
                           port=self.port,
                           sock=self.host_proxy)
        else:
            client.connect(hostname=self.remote_host,
                           username=self.username,
                           key_filename=self.key_file,
                           timeout=self.timeout,
                           compress=self.compress,
                           port=self.port,
                           sock=self.host_proxy,
                           gss_auth=self.gss_auth)

        if self.keepalive_interval:
            client.get_transport().set_keepalive(self.keepalive_interval)

        self.client = client
        return client



class CustomSshPlugin(AirflowPlugin):
    name = "ssh_plugins"
    DEFAULT_PLUGIN = "1.0"
    hooks = [CustomSSHHook]

以上可以在 DAG 中使用,如下所示:

from airflow.hooks.ssh_plugins import CustomSSHHook

edge_node_hook = CustomSSHHook(ssh_conn_id="ssh_con_id",
                         remote_host=host_ip,
                         port=22,
                         timeout=100)
ssh_execution = SSHOperator(dag=dag, task_id='sample_task',
                                          ssh_hook=edge_node_hook,
                                          command='whoami ',
                                          do_xcom_push=False)

并且您必须在 ID 为“ssh_conn_id”的连接的 extras 字段中添加以下参数。

{"gss_auth":"true"}

关于python - Airflow SSH 运算符(operator)如何指定身份验证类型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57614096/

相关文章:

python - Django 创建帖子

python - 我可以发送 **kwargs 来代替 requests.post 中的设置参数吗?

通过 SSH 隧道连接 MySQL 数据库

tomcat - Eclipse Virgo 在 EC2 上无故关闭

macos - Laravel Homestead 在 SSH 身份验证方法 : private key on mac 中挂起

windows - Windows XP 和 Unix 中的 kerberos 票证 TGT 和服务票证的路径?

node.js - docker 中使用 kerberos 和 openldap 进行 SSO

python - Pygame 矩形类

python - 在 python matplotlib 中绘制公差线

Java 7 Kerberos 问题 - AES128 损坏的校验和