python - 使用 Airflow 迁移大型表

标签 python mysql airflow

我刚开始使用 Airflow(对 Python 也很陌生。)

我需要使用 Airflow 将一些非常大的 MySQL 表迁移到 s3 文件。 Airflow 中的所有相关 Hook 和运算符似乎都适合使用 Pandas 数据帧将完整的 SQL 输出加载到内存中,然后转换/导出为所需的文件格式。

这给大型表带来了明显的问题,这些表无法完全装入内存并出现故障。我认为没有办法让 Airflow 读取查询结果并将其保存到本地文件,而不是将其全部存储到内存中。

我看到了使用 MySqlHook bulk_dump 将结果输出到 MySQL 服务器上的文件的方法,但没有明确的方法来传输该结果文件到 s3(或到 Airflow 本地存储,然后到 s3)。

我有点摸不着头脑,因为我曾在 Pentaho 工作过,它可以轻松处理这个问题,但看不到任何明显的解决方案。

我可以尝试将表分割成足够小的 block ,以便Airflow/Pandas可以处理它们,但这需要大量的工作,大量的查询执行,而且有很多 table 。

将非常大的表从MySQL服务器移动到s3有哪些策略?

最佳答案

如果 Airflow 传输操作器不适合您的规模,您不必使用它们。您可以(并且可能应该)使用适合您流程的逻辑创建您自己的 CustomMySqlToS3Operator

几个选项:

  1. 不要在一项任务中传输所有数据。根据日期/行数/其他对数据进行切片。您可以在工作流程中使用 CustomMySqlToS3Operator 的多个任务。这并不像您提到的那样需要大量工作。这只是为您生成的 SQL 查询提供正确的 WHERE 条件的问题。取决于您构建的流程您可以定义每次运行都处理一天的数据,因此您的WHERE条件很简单date_column在execution_date和next_execution_date之间(您可以阅读它在 https://stackoverflow.com/a/65123416/14624409 )。然后使用 catchup=True 回填运行。
  2. 使用 Spark 作为运算符的一部分。
  3. 正如您所指出的,您可以将数据转储到本地磁盘,然后使用 load_file 将其上传到 S3 S3Hook 的方法。这可以作为 CustomMySqlToS3Operator 逻辑的一部分来完成,或者如果您更喜欢从 PythonOperator 调用 Python。

关于python - 使用 Airflow 迁移大型表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66804890/

相关文章:

docker - Airflow从私有(private)谷歌容器存储库中提取docker镜像

python - 如何将 PostgreSQL 查询结果传递给 Airflow 中的变量? (Postgres Operator 或 Postgres Hook)

python - 如何有效地向量化超几何 CDF 计算?

python - Django - 如何在不包含 URL 前缀的情况下渲染 View ?

python - 格式{:%}: Add space between digit and percentage

Mysql 触发器不工作。为什么?

php - 在 MySQL 数据库中存储今天的日期

python - 使用 python 在 Azure Functions 中进行路由

Python 列表元素交换未按预期工作

php - PHP 中列的总和