python - 在数据流中包含其他文件

标签 python google-cloud-dataflow apache-beam

我的数据流使用.sql 文件。此文件包含一个查询,它位于名为 queries 的目录中。

我需要将此文件与我的数据流一起上传。

我发现使用了 manifest.in 文件,但据我所知,它没有做任何事情,我在我的根目录中创建了一个名为 MANIFEST.in 的文件,并且它包含一行:

递归包含查询 *

一些其他消息来源告诉我,我需要为此使用 setup.py 文件。所以现在它看起来像这样:

from __future__ import absolute_import
from __future__ import print_function

import subprocess
from distutils.command.build import build as _build

import setuptools  # pylint: disable-all
setuptools.setup(
    name='MarkPackage',
    version='0.0.1',
    install_requires=[],
    packages=setuptools.find_packages(),
    package_data={
        'queries': ['queries/*'],
    },
    include_package_data=True
)

这也行不通。 错误是:RuntimeError: FileNotFoundError: [Errno 2] No such file or directory: 'queries/testquery.sql' [while running 'generatedPtransform-20']

将任何文件包含在我的数据流的任何或所有部分中使用的最佳做法是什么?

最佳答案

此解决方案是由我们的 Google Cloud 顾问提供给我的。它有效但不建议这样做,因为它只是为了将 SQL 查询与 Python 代码分开而增加了复杂性。 另一种方法是在 Bigquery 上创建一个包含此 SQL 代码的 View ,并在 Bigquery 环境中维护它。

MANIFEST.in
include query.sql

setup.py

import setuptools
setuptools.setup(
    name="example",
    version="0.0.1",
    install_requires=[],
    packages=setuptools.find_packages(),
    data_files=[(".", ["query.sql"])],
    include_package_data=True,
)

主.py

with open ("query.sql", "r") as myfile:
        query=myfile.read()
    with beam.Pipeline(argv=pipeline_args) as p:
        rows = p | "ReadFromBQ" >> beam.io.Read(
            beam.io.BigQuerySource(query=query, use_standard_sql=True)
        )
        rows | "writeToBQ" >> beam.io.Write(
            "BQ Write"
            >> beam.io.WriteToBigQuery(
                known_args.output_table,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            )
        )

关于python - 在数据流中包含其他文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59159615/

相关文章:

google-cloud-dataflow - Cloud Dataflow 日志中的 "GC (Allocation Failure)"

google-cloud-dataflow - 带窗口的 GroupByKey 后,Beam 管道不产生任何输出,并且出现内存错误

python - 数据流 : using beam. combiners 上一个 beam.combiners 的结果

python - pip 安装错误 - ImportError : No module named finsymbols

python - 使用 python 中的 selenium 处理 ul 标签内嵌套 div 中的所有文本

google-analytics - 是否可以使用Google Dataflow处理Google Analytics(分析)数据?

ubuntu - 在 Google Dataflow Worker 上配置 Linux Distro?

python - 如何使用可变长度字符串解码 TFRecord 数据样本?

python - Tkinter - 尽管保留全局引用,但图像不会显示在按钮上

google-cloud-platform - 为 Pubsub 到 Bigquery 流构建此 GCP 数据流示例时出错