java - 如何使用 Dataflow 多次执行相同的工作?

标签 java google-cloud-dataflow gcloud

我有一些工作需要重复完成。例如,假设我想掷 2000 个骰子并收集结果。需要注意的是,骰子的 throw 取决于 PCollection 如何使用 Dataflow 来完成此操作?

我尝试使用 PCollectionList,但结果是我的数据流太大而无法启动 (> 10 MB)。以下是我想要执行的操作的示例(使用 PCollectionList):

// I'd like to operate on things 2000 times. 
PCollection<Thing> things = ...;
List<PCollection<ModifiedThing>> modifiedThingsList = new ArrayList<>();
for (int i = 0; i < 2000; ++i) {
   modifiedThingsList.add(things.apply(ParDo.of(thing -> modify(thing)));
}
PCollection<ModifiedThing> modifiedThings = PCollectionList.of(modifiedThingsList).apply(Flatten.pCollections());

由于上图的 JSON 表示对于 Dataflow 来说太大,因此我需要一种不同的方式来表示此逻辑。有任何想法吗?

最佳答案

ParDoFlatMapElements 可以为每个输入返回任意数量的输出。例如:

PCollection<ModifiedThing> modifiedThings = things.apply(
    ParDo.of(new DoFn<Thing, ModifiedThing>() {
  public void processElement(ProcessContext c) {
    for (int i = 0; i < 2000; ++i) {
      c.output(modify(c.element()));
    }
  }
}));

警告:如果您要立即将其他 ParDo 应用于 modifiedThingsbe careful with fusion ,自 2000 年以来是一个相当高的扇出因子。防止融合的一个很好的示例代码片段是 here .

关于java - 如何使用 Dataflow 多次执行相同的工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41390599/

相关文章:

google-app-engine - Google Cloud SDK 0.9.57 版本中的 gcloud 中断了应用程序的部署

java - 使用 Java 编写客户端通过 UDP 连接向服务器发送 HTTP 请求消息

java - 询问倍数权限android 6.0

gcloud - 如何通过 gcloud shell 在 Cloud Run 部署中正确使用来自 Secret Manager 的 secret

linux - 允许 VM 实例上的 http 流量的 gcloud 命令是什么? (这不是创建防火墙规则!)

google-cloud-dataflow - 从 Google Dataflow 访问在 GKE 中运行的 HTTP 服务

java - 如何注入(inject)自定义委托(delegate)人?

java - Android Adapter的convertview未更新

java - 使用 Dataflow API 启动时将参数传递给模板

python-2.7 - 如何在欧洲使用 Cloud Dataflow 区域终端节点?