我的工作流程是:
- 我从变量
LAST_IMPORTED_ORDER_ID
中获取当前的最大订单 ID - 我正在获取
MySQL
数据库中的最大order_id
- 使用
MySqlToGoogleCloudStorageOperator
将LAST_IMPORTED_ORDER_ID
之间的订单导入到MySQL
的xcom
中的值
到目前为止一切顺利,效果很好。
但是问题是当值之间的差距太大时。可以是 500K
订单。不可能一次导入那么多记录。
MySqlToGoogleCloudStorageOperator
能够使用 approx_max_file_size_bytes
将保存在存储上的文件分成 block ,但它无法将查询本身分块。
基本上我想做的是使用诸如分页之类的东西来进行查询。如果
xcom_order_id - LAST_IMPORTED_ORDER_ID > 50K
然后中断最多 50K 行的查询,这意味着我需要动态创建运算符。
这就是我尝试做的:
LAST_IMPORTED_ORDER_ID = Variable.get("last_order_id_imported")
start_task_op = DummyOperator(task_id='start_task', dag=dag)
def chunck_import(**kwargs):
ti = kwargs['ti']
xcom = int(ti.xcom_pull(task_ids='get_max_order_id_2_months_ago'))
current = int(LAST_IMPORTED_ORDER_ID)
if xcom - current < 50000:
num_pages = 1
else:
num_pages = int((xcom / current) + 1)
logging.info(xcom)
logging.info(current)
for i in range(1, num_pages + 1): #for 1 page its range(1,2)
start = LAST_IMPORTED_ORDER_ID * i
end = start + 50000
if end > xcom:
end = xcom
import_orders_op = MySqlToGoogleCloudStorageOperator(
task_id='import_orders_and_upload_to_storage_orders-{}'.format(i),
mysql_conn_id='mysqlcon',
google_cloud_storage_conn_id='googlecon',
provide_context=True,
approx_max_file_size_bytes=100000000,
sql='select * from e.orders where orders_id between {{ params.start }} and {{ params.end }}',
params={'start': start, 'end': end},
bucket=GCS_BUCKET_ID,
filename=file_name_orders,
dag=dag)
chunck_import_op = PythonOperator(
task_id='chunck_import',
provide_context=True,
python_callable=chunck_import,
dag=dag)
start_task_op >> get_max_order_id_2_months_ago >> chunck_import_op
这没有错误并且运行成功,但什么也没做。
XCOM
中的值是正确的。但 chunk_import_op
不执行任何操作。另外,我在 UI 中没有看到动态创建的 MySqlToGoogleCloudStorageOperator
:
另请注意 print num_pages
我也没有在日志中看到该值。
我该如何解决这个问题?
最佳答案
不幸的是,操作符无法修改其所在的 DAG。由于您只能在操作符执行中拉取 xcom,因此我建议您不要将操作符添加到 DAG,而是在循环设置的末尾up 运算符,在循环内调用:
import_orders_op.pre_execute(**kwargs)
import_orders_op.execute(**kwargs)
这有点困惑,因为所有日志输出都将在任务 chunck_import
中,您可能想为自己逻辑重命名该任务(import_in_chunks
?),但它应该可以工作,并且您的 DAG 不会更改每次运行的任务的确切数量。
或者,我认为这甚至更复杂,假设最大数量的 block ,为每个基于 block 的范围设置一对 ShortCircuitOperator
和 MySqlToGoogleCloudStorageOperator
。 ShortCircuitOperator 应该检查 block 的起始范围是否有效,如果有效则运行 sql 2 gcs 操作,如果无效则短路。
更好的方法是将 MySqlToGoogleCloudStorageOperator 子类化为 PagedMySqlToGCSOperator,覆盖 execute
、_query_mysql
和 _write_local_data_files
。但这还需要更多工作。
关于airflow - 使用 MySqlToGoogleCloudStorageOperator 对记录进行分页,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52753754/