streaming - Dataflow 流管道中 DoFn 的实例过多

标签 streaming google-cloud-dataflow apache-beam google-cloud-sql

我目前正在开发一个数据流流管道,它与 Cloud SQL 有许多交互。 管道通过 Python connector developed by Google 与 Cloud SQL 中的 Postgres 实例进行交互。 。 它通过继承自DoFn类“CloudSQLDoFn”的DoFn函数进行连接,该类通过setup()和teardown()调用处理连接池。 总共,我们有 16 个继承自 CloudSQLDoFn 类的 DoFns。

import apache_beam as beam
from google.cloud.sql.connector import Connector, IPTypes
from sqlalchemy import create_engine

INSTANCE_CONNECTION_NAME = ########
DB_USER = #########
DB_PASS = #########
DB_NAME = #########
POOL_SIZE = 5

class CloudSqlDoFn(beam.DoFn):
    def __init__(
        self,
        local
    ):
        self.local = local

        self.connected_pool = None

        self.instance_connexion_name = INSTANCE_CONNECTION_NAME
        self.db_user = DB_USER
        self.db_pass = DB_PASS
        self.db_name = DB_NAME

        self.pool_size = POOL_SIZE

    def get_conn(self):
        """Create connexion"""

        conn = Connector().connect(
            self.instance_connexion_name,
            "pg8000",
            user=self.db_user,
            password=self.db_pass,
            db=self.db_name,
            ip_type=IPTypes.PRIVATE
        )
        return conn

    def get_pool(self):
        """Create pool of connexion"""
        pool = create_engine(
            "postgresql+pg8000://",
            creator=self.get_conn,
            pool_size=self.pool_size,
            pool_recycle=1800
        )
        return pool

    def setup(self):
        """Open connection or pool of connections to Postgres"""
        self.connected_pool = self.get_pool()
     

    def teardown(self):
        """Close connection to Postgres"""
        self.connected_pool.dispose()

总而言之,我们面临着一个典型的“背压”问题:当太多文件到达时,我们从“Cloud SQL 管理服务器”(设置 SQL 连接)收到大量“太多请求”错误。同一时间。

运行时错误:aiohttp.client_exceptions.ClientResponseError:429,消息='请求太多',url=URL('https://sqladmin.googleapis.com/sql/v1beta4/projects/.../instances/db-csql:generateEphemeralCert')

我们知道这是由于创建了许多 DoFns 实例,这些实例正在调用 setup() 方法,因此请求了太多连接,但我们无法控制连接数量。 我们认为,通过限制工作线程和线程的最大数量,我们可以强制增加延迟(这没问题),但似乎其他参数决定了 DoFn 的实例数量。

我的问题:

  • 除了线程和工作线程的数量之外,什么因素决定了流式数据流中同时实例化的 DoFn 实例的数量?
  • 我们如何强制系统接受较高的延迟/较低的新鲜度,以免 Cloud SQL 管理服务器饱和?

感谢您的帮助。

最佳答案

您可以将池设置为进程级(即将其附加到某个全局静态变量/模块,或 DoFn 类本身)并在所有 DoFn 实例之间共享它,以限制每个进程的连接数,而不管 DoFn 的数量实例化。如果您需要其中多个,您可以为每个 DoFn 指定一个唯一标识符,然后获得 id 到池的静态映射。

在 Dataflow 上,您还可以设置 no_use_multiple_sdk_containers 来限制每个工作虚拟机的进程数量(当然这也会限制管道其他部分的 CPU)。

关于streaming - Dataflow 流管道中 DoFn 的实例过多,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75101788/

相关文章:

google-cloud-platform - GCP Pubsub 中的消息丢失和重复

google-cloud-dataflow - 使用 Dataflow 读取 CSV header

python - Google Cloud Platform 数据流集成

android - 将视频从android流式传输到桌面

ios - swift - 像 Instagram 应用程序一样预下载视频

google-cloud-dataflow - 您如何通过发布/订阅将旧数据重播到数据流中并保持正确的事件时间逻辑?

javascript - 从 Cloud Function 触发 Cloud Dataflow 管道 - 函数超时

kubernetes - Kafka 集群丢失或重复消息

java - vlcj libvlc 流媒体桌面在一段时间后停止

javascript - 使用套接字 IO 和 Node JS 通过套接字流式传输麦克风