python - 云 Composer Airflow 引发错误 : Broken DAG: cannot import name '_parse_data' when importing new dag

标签 python google-cloud-platform google-bigquery google-cloud-storage airflow

我正在尝试在 Cloud Composer 中创建 DAG。导入时出现以下错误:

损坏的 DAG:[/home/airflow/gcs/dags/airflow_bigquery_v12.py] 无法导入名称_parse_data

这是 DAG 文件。正如您将看到的,它尝试将云存储文件复制到 bigquery:

import datetime
from datetime import timedelta, datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                  datetime.min.time())
YESTERDAY = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

DEFAULT_ARGS = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': YESTERDAY,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': datetime.timedelta(minutes=1),
    'project_id': models.Variable.get('gcp_project')
}


with DAG('airflow_bigquery_v12',
         default_args=DEFAULT_ARGS,
         schedule_interval=timedelta(days=1),
         catchup=False
         ) as dag:


    start_task = DummyOperator(task_id="start", dag=dag)
    end_task = DummyOperator(task_id="end", dag=dag)



    gcs_to_bigquery_rides = GoogleCloudStorageToBigQueryOperator(
        dag=dag,
        task_id='load_to_BigQuery_stage',
        bucket='my_bucket',
        destination_project_dataset_table='misc.pg_rides_json_airflow',
        source_format='NEWLINE_DELIMITED_JSON',
        source_objects=['rides_new.json'],
        #ignore_unknown_values = True,
        #schema_fields=dc(),
        schema_object= 'rides_schema.json',
        create_disposition='CREATE_IF_NEEDED',
        write_disposition='WRITE_TRUNCATE',
        #skip_leading_rows = 1,
        google_cloud_storage_conn_id='google_cloud_storage_default',
        bigquery_conn_id='bigquery_default'
        )

start_task >> gcs_to_bigquery_rides >> end_task

作为引用,这是rides_new.json 文件,位于“my_bucket”内,包含要创建的表的架构

[
  {
    "mode": "NULLABLE",
    "name": "finish_picture_state",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "finish_picture_file_id",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "finish_reason",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "starting_battery_level",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "finished_at",
    "type": "TIMESTAMP"
  },
  {
    "mode": "NULLABLE",
    "name": "created_at",
    "type": "TIMESTAMP"
  },
  {
    "mode": "NULLABLE",
    "name": "ending_battery_level",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "state",
    "type": "STRING"
  },
  {
    "fields": [
      {
        "mode": "NULLABLE",
        "name": "currency",
        "type": "STRING"
      },
      {
        "mode": "NULLABLE",
        "name": "amount",
        "type": "INTEGER"
      }
    ],
    "mode": "NULLABLE",
    "name": "cost",
    "type": "RECORD"
  },
  {
    "mode": "NULLABLE",
    "name": "stoped_since",
    "type": "TIMESTAMP"
  },
  {
    "mode": "NULLABLE",
    "name": "user_id",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "minutes",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "id",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "vehicle_id",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "distance",
    "type": "FLOAT"
  },
  {
    "mode": "NULLABLE",
    "name": "service_area_id",
    "type": "STRING"
  },
  {
    "fields": [
      {
        "fields": [
          {
            "mode": "NULLABLE",
            "name": "currency",
            "type": "STRING"
          },
          {
            "mode": "NULLABLE",
            "name": "amount",
            "type": "INTEGER"
          }
        ],
        "mode": "NULLABLE",
        "name": "base",
        "type": "RECORD"
      },
      {
        "fields": [
          {
            "mode": "NULLABLE",
            "name": "currency",
            "type": "STRING"
          },
          {
            "mode": "NULLABLE",
            "name": "amount",
            "type": "INTEGER"
          }
        ],
        "mode": "NULLABLE",
        "name": "per_minute",
      }
    ],
    "mode": "NULLABLE",
    "name": "pricing",
    "type": "RECORD"
  },
  {
    "fields": [
      {
        "mode": "NULLABLE",
        "name": "m",
        "type": "FLOAT"
      },
      {
        "mode": "NULLABLE",
        "name": "latitude",
        "type": "FLOAT"
      },
      {
        "mode": "NULLABLE",
        "name": "longitude",
        "type": "FLOAT"
      }
    ],
    "mode": "REPEATED",
    "name": "path",
    "type": "RECORD"
  }
]

非常感谢您的帮助。谢谢

最佳答案

_parse_data 在 pandas-gbq 0.10.0 上已过时。

https://github.com/pydata/pandas-gbq/commit/ebcbfbe1fecc90ac9454751206115adcafe4ce24#diff-4db670026d33c02e5ad3dfbd5e4fd595L664

Airflow 在 1.10.0 之后停止使用 _parse_data

https://github.com/apache/airflow/commit/8ba86072f9c5ef81933cd6546e7e2f000f862053#diff-ee06f8fcbc476ea65446a30160c2a2b2L27

需要:

  • apache-airflow 降级到低于 1.10.0 的版本或

  • pandas-gbq 降级到低于 0.10.0 的版本。

关于python - 云 Composer Airflow 引发错误 : Broken DAG: cannot import name '_parse_data' when importing new dag,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55925800/

相关文章:

python - 用上面的数据填充空白单元格

python - 使用 pip 与 apt-get 安装软件包有什么区别?

java - 如何从列出的大查询作业中获取查询?

google-bigquery - BigQuery 查询耗时超过 15 分钟

python - Django:导出我内存中的excel文件

带有字节序转换的 Python 文件 Slurp

java - com.google.api.client 库中的 NoSuchMethodError

python - 我想在 python 代码中添加 ssl 证书,但我不知道如何在代码中添加它

python - 从 Python 中的 download_as_string 访问 blob 对象中的数据

google-bigquery - 如何在 Dataflow 中使用 BigQuery Standard SQL?