go - 在Apache Beam Dataflow管道中控制并行度

标签 go google-cloud-dataflow apache-beam

我们正在尝试使用Apache Beam(使用Go SDK)和Dataflow并行化一项耗时的任务。对于更多的上下文,我们有一个缓存工作,它接受一些查询,在数据库中运行它并缓存它们。每个数据库查询可能需要几秒钟到几分钟,我们希望并行运行这些查询以更快地完成任务。
创建了一个简单的管道,如下所示:

    // Create initial PCollection.
    startLoad := beam.Create(s, "InitialLoadToStartPipeline")

    // Emits a unit of work along with query and date range.
    cachePayloads := beam.ParDo(s, &getCachePayloadsFn{Config: config}, startLoad)

    // Emits a cache response which includes errCode, errMsg, time etc.
    cacheResponses := beam.ParDo(s, &cacheQueryDoFn{Config: config}, cachePayloads)

    ...
getCachePayloadsFn发出的数量单位不是很多,在生产中将以数百为单位,最多为数千。
现在的问题是cacheQueryDoFn不会并行执行,查询正在逐个顺序执行。我们通过在缓存功能中记录goroutine ID,进程ID,开始和结束时间等以将日志分别放入StartBundleProcessElement中来确认这一点,以确认执行没有重叠。
即使只有10个查询,我们也希望始终并行运行查询。根据我们的理解和文档,它从总体输入中创建 bundle 包,这些 bundle 包并行运行,并且在 bundle 包中依次运行。有没有一种方法可以控制负载中束的数量,或者有什么方法可以增加并行度?
我们尝试过的事情:
  • 保留num_workers=2autoscaling_algorithm=None。它启动两个VM,但运行Setup方法以仅在一个VM上初始化DoFn,并将其用于整个负载。
  • 找到了sdk_worker_parallelism选项here。但是不确定如何正确设置它。尝试使用beam.PipelineOptions.Set("sdk_worker_parallelism", "50")设置它。没有效果。
  • 最佳答案

    默认情况下,Create不是并行的,并且所有DoFns都与Create融合在同一阶段,因此它们也没有并行性。有关更多信息,请参见https://beam.apache.org/documentation/runtime/model/#dependent-parallellism
    您可以使用Reshuffle转换显式强制融合中断。

    关于go - 在Apache Beam Dataflow管道中控制并行度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65608212/

    相关文章:

    java - 在Dataflow中手动发送PubSub消息

    google-cloud-dataflow - 谷歌数据流 Apache Beam

    google-cloud-platform - 如何计算Google Dataflow文件处理的输入文件中的行数?

    json - 恒定时间搜索深度嵌套的 JSON 数据

    amazon-web-services - 如何在 docker 容器内使用主机的 IAM

    java - 使用 Java API/Dataflow 将重复记录插入 Big Query - "Repeated field must be imported as a JSON array"

    java - 如何在 kubernetes 环境中使用 spark 配置 beam python sdk

    c# - 如何禁用 .NET Core http 客户端中的 SSL 证书检查?

    go - CGo:如何将二维 slice 传递给 C 函数

    java - 有没有办法在 Beam 的 ParDo 转换中创建 SpecificRecord 列表以写入 Parquet 文件?