阅读后this question ,我对Dataflow/Apache Beam 如何分配工作负载仍有一些疑问。我遇到的问题可以用以下代码演示:
package debug;
import java.io.IOException;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
public class DebugPipeline {
@SuppressWarnings("serial")
public static PipelineResult main(String[] args) throws IOException {
/*******************************************
* SETUP - Build options.
********************************************/
DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setAutoscalingAlgorithm(
DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED);
// Autoscaling will scale between n/15 and n workers, so from 1-15 here
options.setMaxNumWorkers(15);
// Default of 250GB is absurdly high and we don't need that much on every worker
options.setDiskSizeGb(32);
// Manually configure scaling (i.e. 1 vs 5 for comparison)
options.setNumWorkers(5);
// Debug Pipeline
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(PubsubIO.readStrings()
.fromSubscription("your subscription"))
// this is the transform that I actually care about. In production code, this will
// send a REST request to some 3rd party endpoint.
.apply("sleep", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) throws InterruptedException {
Thread.sleep(500);
c.output(c.element());
}
}));
return pipeline.run();
}
}
比较使用 1 个 worker 和 5 个 worker 时的最大吞吐量,而不是后者的效率提高了 5 倍,只是稍微提高了一点点。这让我想知道以下问题:
asynchronous “job”
.这是否意味着每个 DoFn 实例都是异步处理的? Thread.sleep
替换为对第 3 方 API 的同步 http 请求。异步过程是否意味着它将同步客户端转换为异步? 更新
还有一个额外的问题:
Dataflow documentation有一个关于 PubSubIO 的评论:
In extreme cases (i.e. Cloud Pub/Sub subscriptions with large publishing batches or sinks with very high latency), autoscaling is known to become coarse-grained.
你能不能扩展一下:
最佳答案
关于google-cloud-platform - Dataflow 如何自动扩展和分配工作负载?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51657291/