google-cloud-platform - Google Cloud Composer(Airflow) - DAG 内的数据流作业成功执行,但 DAG 失败

标签 google-cloud-platform airflow directed-acyclic-graphs google-cloud-composer

我的 DAG 看起来像这样

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 0,
    'dataflow_default_options': {
        'project': 'test',
        'tempLocation': 'gs://test/dataflow/pipelines/temp/',
        'stagingLocation': 'gs://test/dataflow/pipelines/staging/',
        'autoscalingAlgorithm': 'BASIC',
        'maxNumWorkers': '1',
        'region': 'asia-east1'
    }
}

dag = DAG(
    dag_id='gcs_avro_to_bq_dag',
    default_args=default_args,
    description='ETL for loading data from GCS(present in the avro format) to BQ',
    schedule_interval=None,
    dagrun_timeout=datetime.timedelta(minutes=30))

task = DataFlowJavaOperator(
    task_id='gcs_avro_to_bq_flow_job',
    jar='gs://test/dataflow/pipelines/jobs/test-1.0-SNAPSHOT.jar',
    poll_sleep=1,
    options={
        'input': '{{ ts }}',
    },
    dag=dag)


我的 DAG 正在执行一个 jar 文件。 jar 文件包含运行数据流作业的代码,该作业将数据从 GCS 写入 BQ。 jar 本身成功执行。

当我尝试执行 Airflow 作业时,我看到以下错误
[2020-05-20 17:20:41,934] {base_task_runner.py:101} INFO - Job 274: Subtask gcs_avro_to_bq_flow_job [2020-05-20 17:20:41,840] {gcp_api_base_hook.py:97} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2020-05-20 17:20:41,937] {base_task_runner.py:101} INFO - Job 274: Subtask gcs_avro_to_bq_flow_job [2020-05-20 17:20:41,853] {discovery.py:272} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/dataflow/v1b3/rest
[2020-05-20 17:20:44,338] {base_task_runner.py:101} INFO - Job 274: Subtask gcs_avro_to_bq_flow_job [2020-05-20 17:20:44,338] {discovery.py:873} INFO - URL being requested: GET https://dataflow.googleapis.com/v1b3/projects/test/locations/asia-east1/jobs/asia-east1?alt=json
[2020-05-20 17:20:45,285] {__init__.py:1631} ERROR - <HttpError 404 when requesting https://dataflow.googleapis.com/v1b3/projects/test/locations/asia-east1/jobs/asia-east1?alt=json returned "(7e83a8221abb0a9b): Information about job asia-east1 could not be found in our system. Please double check the id is correct. If it is please contact customer support.">
Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models/__init__.py", line 1491, in _run_raw_tas
    result = task_copy.execute(context=context
  File "/usr/local/lib/airflow/airflow/contrib/operators/dataflow_operator.py", line 184, in execut
    self.jar, self.job_class
  File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 220, in start_java_dataflo
    self._start_dataflow(variables, name, command_prefix, label_formatter
  File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_api_base_hook.py", line 286, in wrappe
    return func(self, *args, **kwargs
  File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 200, in _start_dataflo
    self.poll_sleep, job_id).wait_for_done(
  File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 44, in __init_
    self._job = self._get_job(
  File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 63, in _get_jo
    jobId=self._job_id).execute(num_retries=5
  File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/_helpers.py", line 130, in positional_wrappe
    return wrapped(*args, **kwargs
  File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py", line 851, in execut
    raise HttpError(resp, content, uri=self.uri

我做了更多的挖掘,我可以看到 Airflow 正在调用以下 API https://dataflow.googleapis.com/v1b3/projects/test/locations/asia-east1/jobs/asia-east1
正如您所看到的,jobs 之后的最后一个参数是 asia-east ,所以我感觉airflow 作业正试图使用​​我在default_args 中提供的区域来搜索数据流作业的状态。不确定这是否是正在发生的事情,但只是想说明观察结果。我的流 DAG 中是否缺少某些内容?我的java作业逻辑也看起来像这样
public class GcsAvroToBQ {

    public interface Options extends PipelineOptions {
        @Description("Input")
        ValueProvider<String> getInput();

        void setInput(ValueProvider<String> value);
    }

    /**
     * Main entry point for executing the pipeline.
     *
     * @param args The command-line arguments to the pipeline.
     */
    public static void main(String[] args) {

        GcsAvroToBQ.Options options = PipelineOptionsFactory.fromArgs(args)
                .withValidation()
                .as(GcsAvroToBQ.Options.class);

        options.getJobName();

        run(options);
    }

    public static PipelineResult run(Options options) {
        // Create the pipeline
        Pipeline pipeline = Pipeline.create(options);

        // My Pipeline logic to read Avro and upload to BQ

        PCollection<TableRow> tableRowsForBQ; // Data to store in BQ
        tableRowsForBQ.apply(
                BigQueryIO.writeTableRows()
                        .to(bqDatasetName)
                        .withSchema(fieldSchemaListBuilder.schema())
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));


        return pipeline.run();
    }
}

最佳答案

这是 2.20.0 版 sdk 中已确认的错误

https://github.com/apache/airflow/blob/master/airflow/providers/google/cloud/hooks/dataflow.py#L47

请使用 2.19.0 版本,它应该可以正常工作。

 <dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
  <version>2.19.0</version>
  <scope>runtime</scope>
</dependency>

关于google-cloud-platform - Google Cloud Composer(Airflow) - DAG 内的数据流作业成功执行,但 DAG 失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61919610/

相关文章:

c++ - 为什么该算法没有在有向无环图中找到最长的路径?

kubernetes - 在 GKE 上的 Kubernetes Horizo​​ntalPodAutoscaler 上描述的指标是什么?

ssl - 如何为使用 nginx、gunicorn 和 flask 的 GCP 实例设置 SSL 证书?

node.js - 如何创建可公开访问的文件

python - 如何在 Airflow 中设置 DAG 之间的依赖关系?

machine-learning - 是否可以使用马尔可夫毯来判断两个节点是否条件独立?

python - Airflow : ExternalTaskSensor doesn't trigger the task

tensorflow - Google ML-Engine 中的 keras 模型预测失败

python - Airflow 任务中不允许使用 multiprocessing.Pool 吗? - 断言错误 : daemonic processes are not allowed to have children

python - 使用 Airflow hive 运算符并输出到文本文件