airflow - 使用 MySqlToGoogleCloudStorageOperator 对记录进行分页

标签 airflow

我的工作流程是:

  1. 我从变量 LAST_IMPORTED_ORDER_ID 中获取当前的最大订单 ID
  2. 我正在获取 MySQL 数据库中的最大 order_id
  3. 使用 MySqlToGoogleCloudStorageOperatorLAST_IMPORTED_ORDER_ID 之间的订单导入到 MySQLxcom 中的值

到目前为止一切顺利,效果很好。

但是问题是当值之间的差距太大时。可以是 500K 订单。不可能一次导入那么多记录。

MySqlToGoogleCloudStorageOperator 能够使用 approx_max_file_size_bytes 将保存在存储上的文件分成 block ,但它无法将查询本身分块。

基本上我想做的是使用诸如分页之类的东西来进行查询。如果
xcom_order_id - LAST_IMPORTED_ORDER_ID > 50K 然后中断最多 50K 行的查询,这意味着我需要动态创建运算符。

这就是我尝试做的:

LAST_IMPORTED_ORDER_ID = Variable.get("last_order_id_imported")

start_task_op = DummyOperator(task_id='start_task', dag=dag)

def chunck_import(**kwargs):
    ti = kwargs['ti']
    xcom = int(ti.xcom_pull(task_ids='get_max_order_id_2_months_ago'))
    current = int(LAST_IMPORTED_ORDER_ID)
    if xcom - current < 50000:
        num_pages = 1
    else:
        num_pages = int((xcom / current) + 1)
    logging.info(xcom)
    logging.info(current)
    for i in range(1, num_pages + 1):  #for 1 page its range(1,2)
        start = LAST_IMPORTED_ORDER_ID * i
        end = start + 50000
        if end > xcom:
            end = xcom
        import_orders_op = MySqlToGoogleCloudStorageOperator(
            task_id='import_orders_and_upload_to_storage_orders-{}'.format(i),
            mysql_conn_id='mysqlcon',
            google_cloud_storage_conn_id='googlecon',
            provide_context=True,
            approx_max_file_size_bytes=100000000,
            sql='select * from e.orders where orders_id between {{ params.start }} and {{ params.end }}',
            params={'start': start, 'end': end},
            bucket=GCS_BUCKET_ID,
            filename=file_name_orders,
            dag=dag)


chunck_import_op = PythonOperator(
    task_id='chunck_import',
    provide_context=True,
    python_callable=chunck_import,
    dag=dag)

start_task_op >> get_max_order_id_2_months_ago >> chunck_import_op

这没有错误并且运行成功,但什么也没做。

XCOM 中的值是正确的。但 chunk_import_op 不执行任何操作。另外,我在 UI 中没有看到动态创建的 MySqlToGoogleCloudStorageOperator:

enter image description here

另请注意 print num_pages 我也没有在日志中看到该值。

我该如何解决这个问题?

最佳答案

不幸的是,操作符无法修改其所在的 DAG。由于您只能在操作符执行中拉取 xcom,因此我建议您不要将操作符添加到 DAG,而是在循环设置的末尾up 运算符,在循环内调用:

import_orders_op.pre_execute(**kwargs)
import_orders_op.execute(**kwargs)

这有点困惑,因为所有日志输出都将在任务 chunck_import 中,您可能想为自己逻辑重命名该任务(import_in_chunks?),但它应该可以工作,并且您的 DAG 不会更改每次运行的任务的确切数量。

或者,我认为这甚至更复杂,假设最大数量的 block ,为每个基于 block 的范围设置一对 ShortCircuitOperatorMySqlToGoogleCloudStorageOperator 。 ShortCircuitOperator 应该检查 block 的起始范围是否有效,如果有效则运行 sql 2 gcs 操作,如果无效则短路。

更好的方法是将 MySqlToGoogleCloudStorageOperator 子类化为 PagedMySqlToGCSOperator,覆盖 execute_query_mysql_write_local_data_files。但这还需要更多工作。

关于airflow - 使用 MySqlToGoogleCloudStorageOperator 对记录进行分页,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52753754/

相关文章:

python - 如何使用 Cloud composer 将大数据从 Postgres 导出到 S3?

python-3.x - 如何使用 REST API 触发 Airflow dag(我得到 "Property is read-only - ' state'”,错误)

python - 安装 Apache Airflow 后出错

oozie - 子任务中的 AirFlow dag id 访问

Airflow -D 不工作,不在后台运行

apache-spark - 如何在 Airflow 中将参数传递给 DataprocSubmitJobOperator?

kubernetes - 如何使用 nginx-ingress-controller Kubernetes 暴露 localhost Airflow

google-cloud-pubsub - Apache Airflow 与 Google 云 pubsub 的集成

python - 将参数从 BranchPythonOperator 传递到 PythonOperator

airflow - Apache Airflow : airflow initdb throws ModuleNotFoundError: No module named 'werkzeug.wrappers.json' ; 'werkzeug.wrappers' is not a package error