我正在尝试使用 SSHOperator
从 Airflow Worker ssh 登录到我的一台服务器。我的 SSH 被配置为使用 Kerberos 身份验证类型。使用以下额外参数配置的默认 SSH 连接时出现以下错误。
SSH operator error: Bad authentication type; allowed types: ['publickey,'gssapi-keyex',gssapi-with-mic,'keyboard-interactive']
如何解决这个错误。我在 Airflow 用户界面的连接的附加字段中尝试了以下设置。
{ "gss-auth":"true","gssapi-keyex":"true","gss-kex":"true"}
Airflow SSHoperator
中是否有一个选项可以指定要使用的身份验证类型是 Kerberos?
最佳答案
即使底层 Paramiko 库具有该支持,Airflow 的现有 SSHOperator
中也没有 Kerberos 身份验证支持。
我能够通过编写扩展 SSHHook 的自定义 Hook 来解决此问题,该 Hook 将参数传递给底层 Paramiko 库以指定 Kerberos 作为身份验证类型。有效!感谢 Airflow 的易于扩展性。
自定义 Hook :
class CustomSSHHook(SSHHook):
"""
Custom SSH Hook with kerberose authentication support
"""
def __init__(self,
ssh_conn_id=None,
remote_host=None,
username=None,
password=None,
key_file=None,
port=None,
timeout=10,
keepalive_interval=30):
super(CustomSSHHook, self).__init__(
ssh_conn_id,
remote_host,
username,
password,
key_file,
port,
timeout,
keepalive_interval)
# Use connection to override defaults
self.gss_auth = False
if self.ssh_conn_id is not None:
conn = self.get_connection(self.ssh_conn_id)
if conn.extra is not None:
extra_options = conn.extra_dejson
if "gss_auth" in extra_options \
and str(extra_options["gss_auth"]).lower() == 'true':
self.gss_auth = True
def get_conn(self):
"""
Opens a ssh connection to the remote host.
:return paramiko.SSHClient object
"""
self.log.debug('Creating SSH client for conn_id: %s', self.ssh_conn_id)
client = paramiko.SSHClient()
client.load_system_host_keys()
if self.no_host_key_check:
# Default is RejectPolicy
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if self.password and self.password.strip():
client.connect(hostname=self.remote_host,
username=self.username,
password=self.password,
key_filename=self.key_file,
timeout=self.timeout,
compress=self.compress,
port=self.port,
sock=self.host_proxy)
else:
client.connect(hostname=self.remote_host,
username=self.username,
key_filename=self.key_file,
timeout=self.timeout,
compress=self.compress,
port=self.port,
sock=self.host_proxy,
gss_auth=self.gss_auth)
if self.keepalive_interval:
client.get_transport().set_keepalive(self.keepalive_interval)
self.client = client
return client
class CustomSshPlugin(AirflowPlugin):
name = "ssh_plugins"
DEFAULT_PLUGIN = "1.0"
hooks = [CustomSSHHook]
以上可以在 DAG 中使用,如下所示:
from airflow.hooks.ssh_plugins import CustomSSHHook
edge_node_hook = CustomSSHHook(ssh_conn_id="ssh_con_id",
remote_host=host_ip,
port=22,
timeout=100)
ssh_execution = SSHOperator(dag=dag, task_id='sample_task',
ssh_hook=edge_node_hook,
command='whoami ',
do_xcom_push=False)
并且您必须在 ID 为“ssh_conn_id”的连接的 extras 字段中添加以下参数。
{"gss_auth":"true"}
关于python - Airflow SSH 运算符(operator)如何指定身份验证类型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57614096/