我目前正在将之前在 bash 脚本中实现的工作流程转换为 Airflow DAG。在 bash 脚本中,我只是在运行时使用
导出变量export HADOOP_CONF_DIR="/etc/hadoop/conf"
现在我想在 Airflow 中做同样的事情,但还没有找到解决方案。我发现的一种解决方法是在任何方法或运算符之外使用 os.environ[VAR_NAME]='some_text'
设置变量,但这意味着它们会在脚本加载时导出,而不是在运行时导出时间。
现在,当我尝试在由 PythonOperator 调用的函数中调用 os.environ[VAR_NAME] = 'some_text'
时,它不起作用。我的代码看起来像这样
def set_env():
os.environ['HADOOP_CONF_DIR'] = "/etc/hadoop/conf"
os.environ['PATH'] = "somePath:" + os.environ['PATH']
os.environ['SPARK_HOME'] = "pathToSparkHome"
os.environ['PYTHONPATH'] = "somePythonPath"
os.environ['PYSPARK_PYTHON'] = os.popen('which python').read().strip()
os.environ['PYSPARK_DRIVER_PYTHON'] = os.popen('which python').read().strip()
set_env_operator = PythonOperator(
task_id='set_env_vars_NOT_WORKING',
python_callable=set_env,
dag=dag)
现在,当我的 SparkSubmitOperator 被执行时,我得到了异常:
Exception in thread "main" java.lang.Exception: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
我的相关用例是我有 SparkSubmitOperator
,我在其中将作业提交给 YARN,因此 HADOOP_CONF_DIR
或 YARN_CONF_DIR
必须设置在环境中。遗憾的是,在我的 .bashrc
或任何其他配置中设置它们对我来说是不可能的,这就是为什么我需要在运行时设置它们。
我最好在执行 SparkSubmitOperator
之前在 Operator 中设置它们,但是如果有可能将它们作为参数传递给 SparkSubmitOperator
,那将至少是一些东西。
最佳答案
根据我在 spark submit operator 中看到的内容您可以将环境变量作为字典传递给 spark-submit。
:param env_vars: Environment variables for spark-submit. It
supports yarn and k8s mode too.
:type env_vars: dict
你试过吗?
关于python - 使用 Airflow 在运行时导出环境变量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51312425/