我刚开始使用 Airflow(对 Python 也很陌生。)
我需要使用 Airflow 将一些非常大的 MySQL 表迁移到 s3 文件。 Airflow 中的所有相关 Hook 和运算符似乎都适合使用 Pandas 数据帧将完整的 SQL 输出加载到内存中,然后转换/导出为所需的文件格式。
这给大型表带来了明显的问题,这些表无法完全装入内存并出现故障。我认为没有办法让 Airflow 读取查询结果并将其保存到本地文件,而不是将其全部存储到内存中。
我看到了使用 MySqlHook bulk_dump 将结果输出到 MySQL 服务器上的文件的方法,但没有明确的方法来传输该结果文件到 s3(或到 Airflow 本地存储,然后到 s3)。
我有点摸不着头脑,因为我曾在 Pentaho 工作过,它可以轻松处理这个问题,但看不到任何明显的解决方案。
我可以尝试将表分割成足够小的 block ,以便Airflow/Pandas可以处理它们,但这需要大量的工作,大量的查询执行,而且有很多 table 。
将非常大的表从MySQL服务器移动到s3有哪些策略?
最佳答案
如果 Airflow 传输操作器不适合您的规模,您不必使用它们。您可以(并且可能应该)使用适合您流程的逻辑创建您自己的 CustomMySqlToS3Operator
。
几个选项:
- 不要在一项任务中传输所有数据。根据日期/行数/其他对数据进行切片。您可以在工作流程中使用
CustomMySqlToS3Operator
的多个任务。这并不像您提到的那样需要大量工作。这只是为您生成的 SQL 查询提供正确的WHERE
条件的问题。取决于您构建的流程您可以定义每次运行都处理一天的数据,因此您的WHERE
条件很简单date_column在execution_date和next_execution_date之间
(您可以阅读它在 https://stackoverflow.com/a/65123416/14624409 )。然后使用catchup=True
回填运行。 - 使用 Spark 作为运算符的一部分。
- 正如您所指出的,您可以将数据转储到本地磁盘,然后使用 load_file 将其上传到 S3
S3Hook
的方法。这可以作为CustomMySqlToS3Operator
逻辑的一部分来完成,或者如果您更喜欢从PythonOperator
调用 Python。
关于python - 使用 Airflow 迁移大型表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66804890/