python - Google Dataflow - 无法导入自定义 python 模块

标签 python google-cloud-dataflow apache-beam

我的 Apache Beam 管道实现了自定义 Transforms 和 ParDo 的 Python 模块,这些模块进一步导入我编写的其他模块。在本地运行器上,这工作正常,因为所有可用文件都在同一路径中可用。对于数据流运行程序,管道失败并出现模块导入错误。

如何使自定义模块可供所有数据流工作人员使用?请指教。

下面是一个例子:

ImportError: No module named DataAggregation

    at find_class (/usr/lib/python2.7/pickle.py:1130)
    at find_class (/usr/local/lib/python2.7/dist-packages/dill/dill.py:423)
    at load_global (/usr/lib/python2.7/pickle.py:1096)
    at load (/usr/lib/python2.7/pickle.py:864)
    at load (/usr/local/lib/python2.7/dist-packages/dill/dill.py:266)
    at loads (/usr/local/lib/python2.7/dist-packages/dill/dill.py:277)
    at loads (/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py:232)
    at apache_beam.runners.worker.operations.PGBKCVOperation.__init__ (operations.py:508)
    at apache_beam.runners.worker.operations.create_pgbk_op (operations.py:452)
    at apache_beam.runners.worker.operations.create_operation (operations.py:613)
    at create_operation (/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py:104)
    at execute (/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py:130)
    at do_work (/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py:642)

最佳答案

问题可能是您没有将文件分组为一个包。 Beam 文档有 a section就在上面。

Multiple File Dependencies

Often, your pipeline code spans multiple files. To run your project remotely, you must group these files as a Python package and specify the package when you run your pipeline. When the remote workers start, they will install your package. To group your files as a Python package and make it available remotely, perform the following steps:

  1. Create a setup.py file for your project. The following is a very basic setup.py file.

    setuptools.setup(
        name='PACKAGE-NAME'
        version='PACKAGE-VERSION',
        install_requires=[],
        packages=setuptools.find_packages(),
    )
    
  2. Structure your project so that the root directory contains the setup.py file, the main workflow file, and a directory with the rest of the files.

    root_dir/
        setup.py
        main.py
        other_files_dir/
    

See Juliaset for an example that follows this required project structure.

  1. Run your pipeline with the following command-line option:

    --setup_file /path/to/setup.py
    

Note: If you created a requirements.txt file and your project spans multiple files, you can get rid of the requirements.txt file and instead, add all packages contained in requirements.txt to the install_requires field of the setup call (in step 1).

关于python - Google Dataflow - 无法导入自定义 python 模块,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51262031/

相关文章:

python - ERROR : Could not build wheels for frozenlist, multidict, yarl,这是安装基于 pyproject.toml 的项目所必需的

python - 让 Flask 显示不在静态目录中的图像

python - 测试工具 ConcurrentStreamTestSuite 的一个简单的工作示例

python - 谷歌云平台: Pub/Sub to Bigtable

python - 在美国位置未找到数据集

python - 使用 django 表单处理图像

java - 无法传递 FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards

google-cloud-dataflow - 具体版本 : PubSub/Dataflow acknowledgement of unbounded data

google-cloud-dataflow - 如何为 Beam 管道中的 session 窗口编写单元测试?

java.lang.IllegalStateException : Unable to return a default Coder in dataflow 2. X