python - 在 Python 中安排 Google Cloud Dataflow 作业

标签 python google-cloud-dataflow

目前这些是 options安排我知道的数据流作业的执行:

  • 使用 App Engine Cron 服务或云函数。

    • example就是用Java,有没有官方的例子跟Python一样简单?
    • example与 Python 一起使用,但我不确定目前是否仍然是一个不错的选择或已“弃用”
  • 来自 Compute Engine 中的 cron 作业

    • 有这方面的教程吗?
  • 在流式管道中使用窗口化

    • 我认为这是最简单的,但从总成本来看是不是最好的想法?
  • Scheduler

    • 这是一个有效的方法吗?

最佳答案

我使用 App Engine Flex 作为我的数据流启动器。这个微服务有端点来按需启动数据流作业,cron 也可以命中。

这是我的项目结构:

df_tasks/
- __init__.py
- datastore_to_csv.py
- ...other_piplines
__init__.py
dflaunch.yaml
main.py
setup.py <-- used by pipelines

对我来说,这样做的诀窍是正确设置我的管道依赖项。即,将 setup.py 用于管道依赖项。像这个例子一样设置它最有帮助: https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/complete/juliaset

设置.py:

import setuptools

setuptools.setup(
    name='dataflow_python_pipeline',
    version='1.0.0',
    description='DataFlow Python Pipeline',
    packages=setuptools.find_packages(),
)

我在 df_tasks 中的管道配置如下所示:

pipeline_options = PipelineOptions.from_dictionary({
        'project': project,
        'runner': 'DataflowRunner',
        'staging_location': bucket_path+'/staging',
        'temp_location': bucket_path+'/temp',
        'setup_file': './setup.py'
    })

然后在main.py中:

from df_tasks import datastore_to_csv

project_id = os.environ['GCLOUD_PROJECT']

@app.route('/datastore-to-csv', methods=['POST'])
def df_day_summary():
    # Extract Payload
        payload = request.get_json()
        model = payload['model']
        for_date = datetime.datetime.strptime(payload['for_date'], '%Y/%m/%d')
    except Exception as e:
        print traceback.format_exc()
        return traceback.format_exc()
    # launch the job
    try:
        job_id, job_name = datastore_to_csv.run(
            project=project_id,
            model=model,
            for_date=for_date,
        )
        # return the job id
        return jsonify({'jobId': job_id, 'jobName': job_name})
    except Exception as e:
        print traceback.format_exc()
        return traceback.format_exc()

关于python - 在 Python 中安排 Google Cloud Dataflow 作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53703189/

相关文章:

python - SQL 类型错误 : can't concat tuple to bytes

python - 使用 ctypes 通过引用传递整数数组

python - 和有什么区别!和 % 在 Jupyter 笔记本中?

python - 使用 Apache Beam 以 CSV 格式将 BigQuery 结果写入 GCS

google-cloud-platform - 允许用户对 gcp 数据流项目进行写访问

google-cloud-dataflow - Google 数据流 GroupByKey 可以处理热键吗?

google-cloud-storage - 文本IO。使用模式 {} 从 GCS 读取多个文件

google-cloud-storage - 属性错误 : 'AuthorizedSession' object has no attribute 'configure_mtls_channel'

python - 访问 python json key 时需要传递参数

python - HTTPS 请求 CurlAsyncHTTPClient 内存泄漏