python - Apache Beam 本地 Python 依赖项

标签 python google-cloud-dataflow apache-beam

我有本地 Python 包,我想在 Apache Beam 管道内部与 DataFlow Runner 一起使用。 我尝试按照文档中提供的说明进行操作:https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/ (本地或非 PyPI 依赖项部分)但没有成功。

我的包具有以下结构:

my_common
├── __init__.py
└── shared
    ├── __init__.py
    └── something.py

something.py 文件包含:

def hello_world():
    return "Hello"

包是使用python setup.py sdist命令构建的。

现在,我的 Apache Beam 管道配置如下:

pipeline_parameters = [
    '--project', project_id,
    '--staging_location', staging_location,
    '--temp_location', temp_location,
    '--max_num_workers', 1,
    "--extra_package", "/absolute/path/to/my/package/my_common-1.0.tar.gz"
]


p = beam.Pipeline("DataFlowRunner", argv=pipeline_parameters)
# rest of the pipeline definition

其中一个管道映射函数具有以下代码,它使用我的模块:

from my_common.shared import something
logging.info(something.hello_world())

每次我将此管道调度到 DataFlow 时,都会收到以下错误:

ImportError: No module named shared

有趣的是,当我在另一个环境中安装此软件包(来自 .tar.gz)文件时,我可以毫无问题地从中导入并运行函数。在我看来,DataFlow 在运行管道之前不会安装该包。

管理本地 Python 依赖项并将其部署到 Google DataFlow 的正确方法是什么?

//更新: https://stackoverflow.com/a/46605344/1955346中描述的解决方案对于我的用例来说是不够的,因为我需要将本地包放在完全不同的文件夹中,并且我的管道的 setup.py 已经有一些内容(我无法按照建议使用外部包的 setup.py那里)。

最佳答案

不通过extra-packages提供它,而是使用setup_file提供它

使用setuptools定义您的setup_file,它看起来有点像下面

from setuptools import setup

setup(
    name="dataflow_pipeline_dependencies",
    version="1.0.0",
    author="Marcin Zablocki",
    author_email="<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="6d14021808000c04012d0902000c0403430e0200" rel="noreferrer noopener nofollow">[email protected]</a>",
    description=("Custom python utils needed for dataflow cloud runner"),
    packages=[
        'my_common'
        ]
)

并使用--setup_file参数传递它,如下所示

pipeline_parameters = [
    '--project', project_id,
    '--staging_location', staging_location,
    '--temp_location', temp_location,
    '--max_num_workers', 1,
    "--setup_file", "/absolute/path/to/your/package/my_common"
]


p = beam.Pipeline("DataFlowRunner", argv=pipeline_parameters)
# rest of the pipeline definition

其中/absolute/path/to/your/package/my_common存储包的目录的路径

关于python - Apache Beam 本地 Python 依赖项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46604870/

相关文章:

python - "gcloud app deploy"不覆盖静态文件夹

python - 在 python 2 或 3 中不使用 '+' 运算符的两个整数之和

python - 将 scipy 对象保存到文件

java - 通过在 Google 云上运行 Dataflow 作业,在 VM 实例上提供自定义标签和元数据

java - 设计输出多个文件(包括二进制输出)的 Apache Beam 转换的理想方法是什么?

python - 在 Python Dataflow/Apache Beam 上启动 CloudSQL 代理

python - 在 Sublime 中设置插入符位置

python - 从 Apache Beam 中的多个文件夹读取文件并将输出映射到文件名

google-cloud-dataflow - 数据流 wordcount.py 示例 "Import by filename is not supported"

apache-beam - 调用 API 是否违反 Apache Beam 编程模型?