google-cloud-platform - Dataflow 如何自动扩展和分配工作负载?

标签 google-cloud-platform google-cloud-dataflow apache-beam

阅读后this question ,我对Dataflow/Apache Beam 如何分配工作负载仍有一些疑问。我遇到的问题可以用以下代码演示:

package debug;

import java.io.IOException;

import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

public class DebugPipeline {
    @SuppressWarnings("serial")
    public static PipelineResult main(String[] args) throws IOException {

        /*******************************************
         * SETUP - Build options.
         ********************************************/

        DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(DataflowPipelineOptions.class);
        options.setRunner(DataflowRunner.class);
        options.setAutoscalingAlgorithm(
                DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED);
        // Autoscaling will scale between n/15 and n workers, so from 1-15 here
        options.setMaxNumWorkers(15);
        // Default of 250GB is absurdly high and we don't need that much on every worker
        options.setDiskSizeGb(32);
        // Manually configure scaling (i.e. 1 vs 5 for comparison)
        options.setNumWorkers(5);

        // Debug Pipeline
        Pipeline pipeline = Pipeline.create(options);
        pipeline
            .apply(PubsubIO.readStrings()
                    .fromSubscription("your subscription"))
            // this is the transform that I actually care about. In production code, this will
            // send a REST request to some 3rd party endpoint.
            .apply("sleep", ParDo.of(new DoFn<String, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) throws InterruptedException {
                    Thread.sleep(500);
                    c.output(c.element());
                }
            }));

        return pipeline.run();
    }
}

比较使用 1 个 worker 和 5 个 worker 时的最大吞吐量,而不是后者的效率提高了 5 倍,只是稍微提高了一点点。这让我想知道以下问题:
  • 假设每个工作人员使用 4 个 vCPU,每个线程是否绑定(bind)到特定的 DoFn,或者如果需要提高性能,可以在给定时刻在所有线程上调用相同的 DoFn?
  • 假设有不止一个 worker ,每个 worker 都会得到一个完整的管道,即每个 Transform 至少有一个实例,包括源?
  • Dataflow/Apache Beam 如何确定更频繁地调用哪个转换?是否会创建更多占用 CPU 资源的 DoFn 实例?上墙时间更长?或者每个 Transform 都被复制了相同的时间?
  • 根据Apache programming guide ,即后端相当于 asynchronous “job” .这是否意味着每个 DoFn 实例都是异步处理的?
  • 同样,在提供的示例代码中,如何异步处理“ sleep ”转换?
  • 在生产代码中,Thread.sleep替换为对第 3 方 API 的同步 http 请求。异步过程是否意味着它将同步客户端转换为异步?

  • 更新

    还有一个额外的问题:
    Dataflow documentation有一个关于 PubSubIO 的评论:

    In extreme cases (i.e. Cloud Pub/Sub subscriptions with large publishing batches or sinks with very high latency), autoscaling is known to become coarse-grained.



    你能不能扩展一下:
  • 什么 大批量出版意思?即大批量或大批量?
  • 是否高延迟接收器 在接收器之前的转换中包括高延迟?
  • 什么是粗粒行为?
  • 最佳答案

  • DoFns 可以在给定时刻在所有线程上调用,(look in the blue star)
  • 是的,每个 worker 将处理一个完整的管道
  • Cloud Dataflow 服务执行各种 optimizations : 融合和组合
  • 那些可以,没有数据依赖的步骤
  • “ sleep ”可以在不同的 worker 上同时处理并按顺序发送到队列中。
  • 取决于数据依赖
  • 关于google-cloud-platform - Dataflow 如何自动扩展和分配工作负载?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51657291/

    相关文章:

    google-bigquery - Google-cloud-dataflow:无法通过 `WriteToBigQuery/BigQuerySink` 和 `BigQueryDisposition.WRITE_TRUNCATE` 将 json 数据插入 bigquery

    java - GCP Dataflow-从存储中读取 CSV 文件并写入 BigQuery

    java - 如何获取 PCollection<String, String> 中的元素总数

    java - Apache Beam 中 DoFn 的线程同步

    kubernetes - Kubernetes命令在Google Cloud Platform上记录日志以实现PCI合规性

    java - Apache Beam 和 CombineFn 的编​​码器问题

    node.js - 等待 Google Cloud Functions (Node-JS) 中的数据存储区回调

    google-cloud-platform - 将数据流从 Google Cloud Storage 流式传输到 Bigquery

    java - 如何在 Google App Engine 中创建 javax.el.E​​xpressionFactory 以使用 Hibernate Validator

    python - 如何检查 BigQuery 查询结果何时返回零记录?