google-cloud-platform - 我们如何在 GCP Composer 环境中使用 GCSToSFTPOperator?

标签 google-cloud-platform airflow google-cloud-composer

我想在我的 GCP Composer 环境中使用 GCSToSFTPOperator,我们有 ariflow 版本 1.10.3composer-1.8.3-airflow-1.10.3 (我已将版本从 1.10.2 升级到 1.10.3)在 GCP Composer 环境中。 GCSToSFTPOperator 存在于最新版本的 Airflow 中。 请参阅下面的引用资料 - https://airflow.readthedocs.io/en/latest/_modules/airflow/operators/gcs_to_sftp.html

我也尝试使用插件,我在插件文件夹中复制了 GCSToSFTPOperator 类源代码,然后导入到我的 python DAG 中,现在我也遇到了 airflow.gcp 错误我尝试在 Composer 环境中安装 gcp 0.2.1 pypi 包,但也出现安装失败错误。

第 1 步 - 创建 DAG 代码并将其放置在 DAG 文件夹中

import os
from airflow import DAG
from airflow import models
from PluginGCSToSFTPOperator import GCSToSFTPOperator
from airflow.utils.dates import days_ago

default_args = {"start_date": days_ago(1)}

BUCKET_SRC = "bucket-name"
OBJECT_SRC = "parent-1.bin"
DESTINATION_PATH = "/tmp/single-file/"


with models.DAG(
    "example_gcs_to_sftp", default_args=default_args, schedule_interval=None, 
    tags=['example']
) as dag:

    copy_file_from_gcs_to_sftp = GCSToSFTPOperator(
        task_id="file-copy-gsc-to-sftp",
        source_bucket=BUCKET_SRC,
        source_object=OBJECT_SRC,
        destination_path=DESTINATION_PATH,
    )

    copy_file_from_gcs_to_sftp

第 2 步 - 复​​制 GCSToSFTPOperator 类代码并粘贴到一个 Python 文件中,并将同一文件放入插件文件夹中。

import os
from tempfile import NamedTemporaryFile
from typing import Optional

#from airflow.plugins_manager import AirflowPlugin
from airflow import AirflowException
from airflow.gcp.hooks.gcs import GCSHook
from airflow.models import BaseOperator
from airflow.providers.sftp.hooks.sftp_hook import SFTPHook
from airflow.utils.decorators import apply_defaults

WILDCARD = "*"

class GCSToSFTPOperator(BaseOperator):

template_fields = ("source_bucket", "source_object", "destination_path")

ui_color = "#f0eee4"


# pylint: disable=too-many-arguments
@apply_defaults
def __init__(
    self,
    source_bucket: str,
    source_object: str,
    destination_path: str,
    move_object: bool = False,
    gcp_conn_id: str = "google_cloud_default",
    sftp_conn_id: str = "ssh_default",
    delegate_to: Optional[str] = None,
    *args,
    **kwargs
) -> None:
    super().__init__(*args, **kwargs)

    self.source_bucket = source_bucket
    self.source_object = source_object
    self.destination_path = destination_path
    self.move_object = move_object
    self.gcp_conn_id = gcp_conn_id
    self.sftp_conn_id = sftp_conn_id
    self.delegate_to = delegate_to
    self.sftp_dirs = None

def execute(self, context):
    gcs_hook = GCSHook(
        gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to
    )

    sftp_hook = SFTPHook(self.sftp_conn_id)

    if WILDCARD in self.source_object:
        total_wildcards = self.source_object.count(WILDCARD)
        if total_wildcards > 1:
            raise AirflowException(
                "Only one wildcard '*' is allowed in source_object parameter. "
                "Found {} in {}.".format(total_wildcards, self.source_object)
            )

        prefix, delimiter = self.source_object.split(WILDCARD, 1)
        objects = gcs_hook.list(
            self.source_bucket, prefix=prefix, delimiter=delimiter
        )

        for source_object in objects:
            destination_path = os.path.join(self.destination_path, source_object)
            self._copy_single_object(
                gcs_hook, sftp_hook, source_object, destination_path
            )

        self.log.info(
            "Done. Uploaded '%d' files to %s", len(objects), self.destination_path
        )
    else:
        destination_path = os.path.join(self.destination_path, self.source_object)
        self._copy_single_object(
            gcs_hook, sftp_hook, self.source_object, destination_path
        )
        self.log.info(
            "Done. Uploaded '%s' file to %s", self.source_object, destination_path

        )

def _copy_single_object(
    self,
    gcs_hook: GCSHook,
    sftp_hook: SFTPHook,
    source_object: str,
    destination_path: str,
) -> None:
    """
    Helper function to copy single object.
    """
    self.log.info(
        "Executing copy of gs://%s/%s to %s",
        self.source_bucket,
        source_object,
        destination_path,
    )

    dir_path = os.path.dirname(destination_path)
    sftp_hook.create_directory(dir_path)

    with NamedTemporaryFile("w") as tmp:
        gcs_hook.download(
            bucket_name=self.source_bucket,
            object_name=source_object,
            filename=tmp.name,
        )
        sftp_hook.store_file(destination_path, tmp.name)

    if self.move_object:
        self.log.info(
            "Executing delete of gs://%s/%s", self.source_bucket, source_object
        )
        gcs_hook.delete(self.source_bucket, source_object)

第 3 步 - 我尝试将相同的文件也放入 DAG 文件夹中,之后也遇到相同的错误 “没有名为‘airflow.gcp’的模块”

现在我应该尝试什么? 是否存在任何替代运算符(operator),或者我们是否有任何其他方式在 Airflow 1.10.3 版本中使用此 GCSToSFTPOperator ??

最佳答案

您正在查找的文档是 Airflow 1.10.7 版本,是最新版本。当您引用 Airflow 1.10.2 documentation 时,您将看到此版本中不存在 gcs_to_sftp 运算符。

您可以尝试的是复制 code ,做一个plugin并将代码放入 Composer 实例存储桶中的插件目录中。如果您仍然遇到问题,请提供您已采取的所有步骤,我会尽力帮助您。

您还可以阅读有关 upgrading 的更多信息Composer 中的 Airflow 版本。

关于google-cloud-platform - 我们如何在 GCP Composer 环境中使用 GCSToSFTPOperator?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59769709/

相关文章:

mysql - ssl连接错误: unknown error number

kubernetes - 部署服务时是否有默认的NodePort?

airflow - Apache Airflow - 用户管理和 Multi-Tenancy

airflow - 如何运行 Airflow DAG 特定次数?

python - 如何在 Google Composer 上重启 Airflow 服务器?

linux - 如果文件不存在,则毫无异常(exception)地从 GCS 中删除

google-cloud-platform - 删除桶后实际对象计数不正确

pyspark - 从 pyspark 读取时,Google Cloud Storage 需要 storage.objects.create 权限

python-3.x - 如何使用 REST API 触发 Airflow dag(我得到 "Property is read-only - ' state'”,错误)

airflow - 如何在 Airflow 中管理 Python 依赖项?