python - 从 Beam 管道连接 google cloud sql postgres 实例

标签 python postgresql google-cloud-sql apache-beam

我想从在 google 数据流上运行的 apache beam 管道连接 google cloud sql postgres 实例。
我想使用 Python SDK 执行此操作。
我无法为此找到合适的文档。
在云 SQL 中如何指导我没有看到任何数据流文档。
https://cloud.google.com/sql/docs/postgres/

有人可以提供文档链接/github 示例吗?

最佳答案

您可以使用来自 beam-nuggetsrelational_db.Writerelational_db.Read 转换如下:

首先安装beam-nuggests:

pip install beam-nuggets

阅读:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db

with beam.Pipeline(options=PipelineOptions()) as p:
    source_config = relational_db.SourceConfiguration(
        drivername='postgresql+pg8000',
        host='localhost',
        port=5432,
        username='postgres',
        password='password',
        database='calendar',
    )
    records = p | "Reading records from db" >> relational_db.Read(
        source_config=source_config,
        table_name='months',
    )
    records | 'Writing to stdout' >> beam.Map(print)

对于写作:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db

with beam.Pipeline(options=PipelineOptions()) as p:
    months = p | "Reading month records" >> beam.Create([
        {'name': 'Jan', 'num': 1},
        {'name': 'Feb', 'num': 2},
    ])
    source_config = relational_db.SourceConfiguration(
        drivername='postgresql+pg8000',
        host='localhost',
        port=5432,
        username='postgres',
        password='password',
        database='calendar',
        create_if_missing=True,
    )
    table_config = relational_db.TableConfiguration(
        name='months',
        create_if_missing=True
    )
    months | 'Writing to DB' >> relational_db.Write(
        source_config=source_config,
        table_config=table_config
    )

关于python - 从 Beam 管道连接 google cloud sql postgres 实例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47129510/

相关文章:

python - 用于嘈杂的不可微损失函数的自定义 Tensorflow 优化器

Python:通过函数传递memmap数组?

ruby-on-rails - 如何判断一个模型有一个空关系

ruby-on-rails - 将 postgresql 数据库的模式名称从 "public"修改为 "my-project"

python - 在 Windows 64 位上安装 Cloud SQL 代理时出现问题

使用多个分隔符和非罗马字符分割Python字符串

python - 使用Happybase扫描远程hbase表时,出现 'Tsocket read 0 bytes Error'

Postgresql Ubuntu 的奇怪行为(版本不兼容?)

node.js - 使用非标准 unix 套接字进行 Sequelize-cli 迁移

google-cloud-sql - 谷歌云 SQL "Idling IP Address"