python - 在 Airflow 上使用 DataprocOperator 的组件网关

标签 python google-cloud-platform airflow google-cloud-dataproc

在 GCP 中,安装和运行 JupyterHub component 相当简单。从用户界面或 gcloud 命令。我正在尝试通过 Airflow 和 DataprocClusterCreateOperator 编写进程脚本,这里是 DAG 的摘录

from airflow.contrib.operators import dataproc_operator  

create_cluster=dataproc_operator.DataprocClusterCreateOperator(
        task_id='create-' + CLUSTER_NAME, 
        cluster_name=CLUSTER_NAME,
        project_id=PROJECT_ID,
        num_workers=3,
        num_masters=1,
        master_machine_type='n1-standard-2',
        worker_machine_type='n1-standard-2',
        master_disk_size=100,
        worker_disk_size=100,
        storage_bucket='test-dataproc-jupyter', 
        region='europe-west4', 
        zone='europe-west4-a',
        auto_delete_ttl=21600, 
        optional_components=['JUPYTER', 'ANACONDA']
    )

但是我无法指定所需的 enable-component-gateway范围。查看源代码,似乎参数不是有意的(在 deprecatedlast stable 运算符中)。

我知道 REST API 提供了 endpointConfig.enableHttpPortAccess ,但我更愿意使用官方运营商。
有谁知道如何实现这一目标?

最佳答案

编辑,一个适用于 composer-1.8.3 的修复,带有 Airflow 1.10.3

在 Airflow 1.10.3 中,无法在外部创建集群配置。但是我们可以继承集群创建操作符并覆盖配置创建。这也让我们可以设置可选组件,这是 Airflow 版本中缺少的一个参数。

class CustomDataprocClusterCreateOperator(DataprocClusterCreateOperator):

    def __init__(self, *args, **kwargs):
        super(CustomDataprocClusterCreateOperator, self).__init__(*args, **kwargs)

    def _build_cluster_data(self):
        cluster_data = super(CustomDataprocClusterCreateOperator, self)._build_cluster_data()
        cluster_data['config']['endpointConfig'] = {
            'enableHttpPortAccess': True
        }
        cluster_data['config']['softwareConfig']['optionalComponents'] = [ 'JUPYTER', 'ANACONDA' ]
        return cluster_data

#Start DataProc Cluster
dataproc_cluster = CustomDataprocClusterCreateOperator(
    task_id='create-' + CLUSTER_NAME, 
    cluster_name=CLUSTER_NAME,
    project_id=PROJECT_ID,
    num_workers=3,
    num_masters=1,
    master_machine_type='n1-standard-2',
    worker_machine_type='n1-standard-2',
    master_disk_size=100,
    worker_disk_size=100,
    storage_bucket='test-dataproc-jupyter', 
    region='europe-west4', 
    zone='europe-west4-a',
    auto_delete_ttl=21600, 
    dag=dag
)

原始答案,适用于 Airflow 1.10.7

虽然不是最优的,但您可以自己创建 Cluster 数据结构,而不是让 Airflow 的 ClusterGenerator 来这样做。它应该适用于最新版本(1.10.7)

cluster = {
  'clusterName': CLUSTER_NAME,
  'config': {
    'gceClusterConfig': {
      'zoneUri': 'europe-west4-a'
    },
    'masterConfig': {
      'numInstances': 1,
      'machineTypeUri': 'n1-standard-2',
      'diskConfig': {
        'bootDiskSizeGb': 100
      },
    },
    'workerConfig': {
      'numInstances': 3,
      'machineTypeUri': 'n1-standard-2',
      'diskConfig': {
        'bootDiskSizeGb': 100
      },
    },
    'softwareConfig': {
      'optionalComponents': [
        'ANACONDA',
        'JUPYTER'
      ]
    },
    'lifestyleConfig': {
      'autoDeleteTtl': 21600
    },
    'endpointConfig': {
      'enableHttpPortAccess': True
    }
  },
  'projectId': PROJECT_ID
}
#Start DataProc Cluster
dataproc_cluster = DataprocClusterCreateOperator(
    task_id='create-' + CLUSTER_NAME,
    project_id=PROJECT_ID,
    num_workers=3,
    region='europe-west4', 
    zone='europe-west4-a',
    cluster = cluster,
    dag=DAG
)

如果您使用的是其他 Airflow 版本,请指明。

您也可以为我打开的错误投票:AIRFLOW-6432

关于python - 在 Airflow 上使用 DataprocOperator 的组件网关,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59568043/

相关文章:

google-cloud-platform - 如何使环境变量作为python sdk中的环境变量到达Dataflow worker

python - 如何从 MYSQL DB 中仅捕获新记录

python - 日期的格式字符串在转换为十进制日和月时无效

python - 使用 numpy 的批量张量乘法

python - 用于小特征值的 Scipy 稀疏 eigsh()

search - 如何搜索 Google 云存储桶?

mysql - 错误 2003 (HY000) : can't connect to MySQL server on ' IPv4 address' (10060)

python - 单元测试 : Does it make sense to test parent object methods?

django - (Django) Airflow 中的 ORM - 有可能吗?

ubuntu - Airflow 任务: OSError: [Errno 23] Too many open files in system