amazon-s3 - Airflow/minio : How do I use minio as a local S3 proxy for data sent from Airflow?

标签 amazon-s3 google-cloud-storage airflow minio

简单的问题:

我想知道如何使用 minio 作为本地 S3 代理来保存 Airflow 发送的数据,而不是使用 S3 或 GCS。我该怎么做呢?我真的可以使用 FileToGoogleCloudStorageOperator 吗?

如果不是用于本地存储(大型图像而不是数据库行)的这条路线,您会推荐什么?

谢谢!

最佳答案

基于 similar answer 构建,这就是我在撰写本文时使用最新版本的 Airflow (1.10.7) 所做的:

首先,使用以下信息创建 S3 连接:

Connection Name: '<your connection name>' #  e.g. local_minio
Connection Type: S3
Extra: a JSON object with the following properties: 
 {
    "aws_access_key_id":"your_minio_access_key",
    "aws_secret_access_key": "your_minio_secret_key",
    "host": "http://127.0.0.1:9000"
 }

接下来,在您的 DAG 中,创建一个使用 S3Hook 与数据交互的任务。下面是一个示例,您可以根据自己的需要进行调整:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.S3_hook import S3Hook

DEFAULT_ARGS = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 1, 13),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('create_date_dimension', default_args=DEFAULT_ARGS,
          schedule_interval="@once")


def write_text_file(ds, **kwargs):
    with open("/tmp/test.txt", "w") as fp:
        # Add file generation/processing step here, E.g.:
        fp.write(ds)

        # Upload generated file to Minio
        s3 = S3Hook('local_minio')
        s3.load_file("/tmp/test.txt",
                     key=f"my-test-file.txt",
                     bucket_name="my-bucket")


# Create a task to call your processing function
t1 = PythonOperator(
    task_id='generate_and_upload_to_s3',
    provide_context=True,
    python_callable=write_text_file,
    dag=dag
)

关于amazon-s3 - Airflow/minio : How do I use minio as a local S3 proxy for data sent from Airflow?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55529401/

相关文章:

Airflow 1.10 - 任务之间的长时间延迟

angular - S3 静态托管 - 没有 index.html 的路径

python - 从 Flask 中的 S3 返回 PDF

ios - 不使用 Oauth 的 Google Json Api

ruby-on-rails - 如何使用 Active Storage 保留存储空间和加载时间?

python - Airflow 没有正确安排 Python

python - 在运算符(operator)之外访问 Airflow 默认变量

javascript - 允许使用 Amazon S3 的 OPTIONS HTTP 方法

amazon-s3 - 如何修复 AWS S3 上损坏的 Delta Lake 表

google-app-engine - 使用 create_gs_key 时,blob 存储会创建 blobInfo 吗?google app engine