我想在我的 GCP Composer 环境中使用 GCSToSFTPOperator
,我们有 ariflow 版本 1.10.3
、composer-1.8.3-airflow-1.10.3
(我已将版本从 1.10.2 升级到 1.10.3)在 GCP Composer 环境中。 GCSToSFTPOperator
存在于最新版本的 Airflow 中。
请参阅下面的引用资料 -
https://airflow.readthedocs.io/en/latest/_modules/airflow/operators/gcs_to_sftp.html
我也尝试使用插件,我在插件文件夹中复制了 GCSToSFTPOperator
类源代码,然后导入到我的 python DAG 中,现在我也遇到了 airflow.gcp
错误我尝试在 Composer 环境中安装 gcp 0.2.1
pypi 包,但也出现安装失败错误。
第 1 步 - 创建 DAG 代码并将其放置在 DAG 文件夹中
import os
from airflow import DAG
from airflow import models
from PluginGCSToSFTPOperator import GCSToSFTPOperator
from airflow.utils.dates import days_ago
default_args = {"start_date": days_ago(1)}
BUCKET_SRC = "bucket-name"
OBJECT_SRC = "parent-1.bin"
DESTINATION_PATH = "/tmp/single-file/"
with models.DAG(
"example_gcs_to_sftp", default_args=default_args, schedule_interval=None,
tags=['example']
) as dag:
copy_file_from_gcs_to_sftp = GCSToSFTPOperator(
task_id="file-copy-gsc-to-sftp",
source_bucket=BUCKET_SRC,
source_object=OBJECT_SRC,
destination_path=DESTINATION_PATH,
)
copy_file_from_gcs_to_sftp
第 2 步 - 复制 GCSToSFTPOperator 类代码并粘贴到一个 Python 文件中,并将同一文件放入插件文件夹中。
import os
from tempfile import NamedTemporaryFile
from typing import Optional
#from airflow.plugins_manager import AirflowPlugin
from airflow import AirflowException
from airflow.gcp.hooks.gcs import GCSHook
from airflow.models import BaseOperator
from airflow.providers.sftp.hooks.sftp_hook import SFTPHook
from airflow.utils.decorators import apply_defaults
WILDCARD = "*"
class GCSToSFTPOperator(BaseOperator):
template_fields = ("source_bucket", "source_object", "destination_path")
ui_color = "#f0eee4"
# pylint: disable=too-many-arguments
@apply_defaults
def __init__(
self,
source_bucket: str,
source_object: str,
destination_path: str,
move_object: bool = False,
gcp_conn_id: str = "google_cloud_default",
sftp_conn_id: str = "ssh_default",
delegate_to: Optional[str] = None,
*args,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.source_bucket = source_bucket
self.source_object = source_object
self.destination_path = destination_path
self.move_object = move_object
self.gcp_conn_id = gcp_conn_id
self.sftp_conn_id = sftp_conn_id
self.delegate_to = delegate_to
self.sftp_dirs = None
def execute(self, context):
gcs_hook = GCSHook(
gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to
)
sftp_hook = SFTPHook(self.sftp_conn_id)
if WILDCARD in self.source_object:
total_wildcards = self.source_object.count(WILDCARD)
if total_wildcards > 1:
raise AirflowException(
"Only one wildcard '*' is allowed in source_object parameter. "
"Found {} in {}.".format(total_wildcards, self.source_object)
)
prefix, delimiter = self.source_object.split(WILDCARD, 1)
objects = gcs_hook.list(
self.source_bucket, prefix=prefix, delimiter=delimiter
)
for source_object in objects:
destination_path = os.path.join(self.destination_path, source_object)
self._copy_single_object(
gcs_hook, sftp_hook, source_object, destination_path
)
self.log.info(
"Done. Uploaded '%d' files to %s", len(objects), self.destination_path
)
else:
destination_path = os.path.join(self.destination_path, self.source_object)
self._copy_single_object(
gcs_hook, sftp_hook, self.source_object, destination_path
)
self.log.info(
"Done. Uploaded '%s' file to %s", self.source_object, destination_path
)
def _copy_single_object(
self,
gcs_hook: GCSHook,
sftp_hook: SFTPHook,
source_object: str,
destination_path: str,
) -> None:
"""
Helper function to copy single object.
"""
self.log.info(
"Executing copy of gs://%s/%s to %s",
self.source_bucket,
source_object,
destination_path,
)
dir_path = os.path.dirname(destination_path)
sftp_hook.create_directory(dir_path)
with NamedTemporaryFile("w") as tmp:
gcs_hook.download(
bucket_name=self.source_bucket,
object_name=source_object,
filename=tmp.name,
)
sftp_hook.store_file(destination_path, tmp.name)
if self.move_object:
self.log.info(
"Executing delete of gs://%s/%s", self.source_bucket, source_object
)
gcs_hook.delete(self.source_bucket, source_object)
第 3 步 - 我尝试将相同的文件也放入 DAG 文件夹中,之后也遇到相同的错误 “没有名为‘airflow.gcp’的模块”
现在我应该尝试什么?
是否存在任何替代运算符(operator),或者我们是否有任何其他方式在 Airflow 1.10.3 版本中使用此 GCSToSFTPOperator
??
最佳答案
您正在查找的文档是 Airflow 1.10.7
版本,是最新版本。当您引用 Airflow 1.10.2
documentation 时,您将看到此版本中不存在 gcs_to_sftp
运算符。
您可以尝试的是复制 code ,做一个plugin并将代码放入 Composer 实例存储桶中的插件目录中。如果您仍然遇到问题,请提供您已采取的所有步骤,我会尽力帮助您。
您还可以阅读有关 upgrading 的更多信息Composer 中的 Airflow 版本。
关于google-cloud-platform - 我们如何在 GCP Composer 环境中使用 GCSToSFTPOperator?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59769709/